Commit 5a228cf0 authored by Elijah Andrews's avatar Elijah Andrews

Improved slot analysis code.

parent f1b4835f
......@@ -5,6 +5,7 @@ import sys
import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.lines as mlines
import numpy as np
from scipy import stats
from sklearn.metrics import r2_score
......@@ -30,7 +31,7 @@ class SweepData:
is_shifted = False # Whether the data has been shifted to correct for offset
Ys = None # Calculated y values.
def __init__(self, geometry_label, m_y, W, H):
def __init__(self, geometry_label, m_y, W, H, sweep_dir=None, sweep_file=None):
self.geometry_label = geometry_label
self.m_y = m_y
self.W = W
......@@ -39,6 +40,8 @@ class SweepData:
self.xs = []
self.thetas = []
self.Ys = []
if sweep_dir and sweep_file:
self.add_points_from_csv(sweep_dir, sweep_file)
def __str__(self):
return f"{self.geometry_label}: y={self.m_y}, q={np.mean(self.Ys):.2f}"
......@@ -50,6 +53,13 @@ class SweepData:
self.thetas.append(theta)
self.Ys.append(Y)
def add_points_from_csv(self, f_dir, f_name, has_headers=True):
""" Add all points from the specified CSV file to the sweep. """
xs, thetas = file.csv_to_lists(f_dir, f_name, has_headers=has_headers)
for x, theta in zip(xs, thetas):
m_x = np.mean([tx for tx in xs if np.abs(x - tx) < 0.05])
self.add_point(m_x, x, theta, self.m_y)
def get_grouped_data(self):
""" Groups data based on x position. """
m_xs_set = set(self.m_xs)
......@@ -226,24 +236,22 @@ class SweepData:
return errors
def select_data_series(use_all_dirs=True, num_series=None, use_defaults=True, verbose=False, create_window=True):
def select_data_series(use_all_dirs=True, num_series=None, base_dir="../../../../Data/SlotSweeps",
verbose=False, create_window=True):
"""
Select a series of data using a GUI.
:param use_all_dirs: Select all directories within a parent directory.
:param num_series: The number of series to select (only used if use_all_dirs is false).
:param use_defaults: Just select the default directory (essentially a legacy feature).
:param base_dir: Directory containing experimental data.
:param verbose: Output debug information.
:param create_window: Whether PyQt5 creates a window (for odd conflicts with Matplotlib).
:return: Array of directory paths.
"""
dirs = []
if use_all_dirs:
if use_defaults:
root_dir = "../../../../Data/SlotSweeps"
else:
root_dir = file.select_dir("../../../../Data/SlotSweeps", create_window=create_window)
if root_dir == "/":
exit()
root_dir = file.select_dir(base_dir, create_window=create_window)
if root_dir == "/":
exit()
for root, _, files in os.walk(root_dir):
if "params.py" in files:
dirs.append(root + "/")
......@@ -253,11 +261,12 @@ def select_data_series(use_all_dirs=True, num_series=None, use_defaults=True, ve
if num_series is None:
num_series = int(input("Number of data sets to load = "))
for i in range(num_series):
dirs.append(file.select_dir("../../../../Data/SlotSweeps", create_window=create_window))
dirs.append(file.select_dir(base_dir, create_window=create_window))
return dirs
def plot_prediction_file(prediction_file, ax, normalize=False, x_star=None, theta_star=None, label="Numerical", c='k'):
def plot_prediction_file(prediction_file, ax, normalize=False, x_star=None, theta_star=None, label="Numerical", c='k',
verbose=False):
"""
Plot a prediction file on a given axes.
:param prediction_file: Full prediction file path.
......@@ -272,6 +281,8 @@ def plot_prediction_file(prediction_file, ax, normalize=False, x_star=None, thet
predicted_xs, predicted_thetas = file.csv_to_lists("", prediction_file, has_headers=True)
if x_star is None or theta_star is None:
x_star, theta_star = sorted(zip(predicted_xs, predicted_thetas), key=lambda k: k[1])[-1]
if verbose:
print(f"{prediction_file}, x_star={x_star:.4f}, theta_star={theta_star:.4f}")
if normalize:
predicted_xs = np.divide(predicted_xs, x_star)
predicted_thetas = np.divide(predicted_thetas, theta_star)
......@@ -281,7 +292,8 @@ def plot_prediction_file(prediction_file, ax, normalize=False, x_star=None, thet
ax.set_xlim((x_min, x_max)) # Reset x limits to their original values.
def plot_prediction_files(prediction_files, ax, normalize=False, coloured_lines=True, x_stars=None, theta_stars=None):
def plot_prediction_files(prediction_files, ax, normalize=False, coloured_lines=True, x_stars=None, theta_stars=None,
verbose=False):
"""
Plot a series of prediction files on a given axes.
:param prediction_files: Array of full prediction file paths.
......@@ -305,12 +317,14 @@ def plot_prediction_files(prediction_files, ax, normalize=False, coloured_lines=
c = 'k'
if coloured_lines:
c = f"C{i + 1}"
plot_prediction_file(f_name, ax, normalize, x_star, theta_star, label=label, c=c)
plot_prediction_file(f_name, ax, normalize, x_star, theta_star, label=label, c=c, verbose=verbose)
ax.set_xlim((x_min, x_max)) # Reset x limits to their original values.
def analyse_slot(ax, sweeps=None, set_y_label=True, set_x_label=True, use_defaults=False,
config=None, num_series=None):
config=None, num_series=None, sweep_save_dir="../../experimental/plotting/sweeps/",
prediction_file_dir="../../numerical/models/model_outputs/exp_comparisons/",
data_dir="../../../../Data/SlotSweeps", prediction_files=None):
"""
Function to plot experimental slot jet angle data.
:param ax: The axes on which to plot.
......@@ -320,6 +334,9 @@ def analyse_slot(ax, sweeps=None, set_y_label=True, set_x_label=True, use_defaul
:param use_defaults: Whether to use the default configuration.
:param config: A configuration to use.
:param num_series: The number of data series to plot.
:param sweep_save_dir: The directory in which to save sweep data.
:param prediction_file_dir: The directory containing prediction files..
:param data_dir: The directory containing experimental data.
:return: The axes that were used (should be the same as the ax input).
"""
create_window = not mpl.get_backend() == "Qt5Agg"
......@@ -365,15 +382,19 @@ def analyse_slot(ax, sweeps=None, set_y_label=True, set_x_label=True, use_defaul
confidence_interval = 0.99 # Set the confidence interval for the error bars.
std = 0.015085955056793596 # Set the standard deviation for the error bars (from error_statistics.py).
sweep_save_dir = "../../experimental/plotting/sweeps/"
prediction_file_dir = "../../numerical/models/model_outputs/exp_comparisons/"
prediction_files = []
if plot_predicted:
if plot_predicted and prediction_files is None:
prediction_files = file.select_multiple_files(prediction_file_dir, create_window=create_window)
min_vert = None
max_vert = None
min_hor = None
max_hor = None
min_rad = None
max_rad = None
min_edge = None
max_edge = None
if sweeps is None:
dirs = select_data_series(use_all_dirs, num_series, use_defaults, verbose, create_window)
dirs = select_data_series(use_all_dirs, num_series, data_dir, verbose, create_window)
###################
# Processing Data #
......@@ -422,19 +443,36 @@ def analyse_slot(ax, sweeps=None, set_y_label=True, set_x_label=True, use_defaul
x = (pos_mm[0] - x_offset) / (0.5 * params.slot_width)
Y = pos_mm[1] - y_offset
sweep_data.add_point(reading.m_x, x, theta, Y)
radius = params.mm_per_px * np.sqrt(reading.max_bubble_area / np.pi)
hor = abs((pos_mm[0] - x_offset) / radius)
vert = abs((pos_mm[1] - y_offset) / radius)
edge = (25 - abs(pos_mm[0] - x_offset)) / radius
if min_hor is None or hor < min_hor:
min_hor = hor
if max_hor is None or hor > max_hor:
max_hor = hor
if min_vert is None or vert < min_vert:
min_vert = vert
if max_vert is None or vert > max_vert:
max_vert = vert
if min_rad is None or radius < min_rad:
min_rad = radius
if max_rad is None or radius > max_rad:
max_rad = radius
if min_edge is None or edge < min_edge:
min_edge = edge
if max_edge is None or edge > max_edge:
max_edge = edge
if np.mean(sweep_data.Ys) < 0:
continue # Ignore bubbles generated inside the slot.
sweeps.append(sweep_data)
_, mean_xs, mean_thetas, _ = sweep_data.get_mean_data()
errors = sweep_data.get_error_bars(confidence_interval, std)
file.lists_to_csv(sweep_save_dir + "all_data/",
f"all_data_sweep_{sweep_data.geometry_label}_Y{np.mean(sweep_data.Ys):.2f}.csv",
[sweep_data.xs, sweep_data.thetas],
headers=["x", "theta"])
file.lists_to_csv(sweep_save_dir + "mean_data/",
f"mean_sweep_{sweep_data.geometry_label}_Y{np.mean(sweep_data.Ys):.2f}.csv",
[mean_xs, mean_thetas, errors],
headers=["mean x", "mean theta", "99% confidence interval"])
print(f"Horizontal: min = {min_hor}, max = {max_hor}")
print(f"Vertical: min = {min_vert}, max = {max_vert}")
print(f"Diameter: min = {min_rad * 2}, max = {max_rad * 2}")
print(f"Edge: min = {min_edge}, max = {max_edge}")
num_rejected_sets = 0
print(f"Found {len(sweeps)} sweeps")
......@@ -444,7 +482,10 @@ def analyse_slot(ax, sweeps=None, set_y_label=True, set_x_label=True, use_defaul
markers = [".", "v", "s", "x", "^", "+", "D", "1", "*", "P", "X", "4", "2", "<", "3", ">", "H", "o", "p", "|"]
if len(markers) < len(sweeps):
raise ValueError("Too few markers are available for the data sets.")
theta_star_pct_difs = []
x_star_pct_difs = []
x_offsets = []
theta_offsets = []
#######################
# Plotting Sweep Data #
#######################
......@@ -469,17 +510,61 @@ def analyse_slot(ax, sweeps=None, set_y_label=True, set_x_label=True, use_defaul
min_fitted_peak_p, min_fitted_peak, min_poly_coeffs,
range_fact, verbose) or is_bad_data
file.lists_to_csv(sweep_save_dir + "raw_data/",
f"raw_data_sweep_{sweep.geometry_label}_Y{np.mean(sweep.Ys):.2f}.csv",
[sweep.xs, sweep.thetas],
headers=["x", "theta"])
file.lists_to_csv(sweep_save_dir + "mean_data/",
f"mean_sweep_{sweep.geometry_label}_Y{np.mean(sweep.Ys):.2f}.csv",
[mean_xs, means, y_errs],
headers=["mean x", "mean theta", "99% confidence interval"])
x_star, x_offset, theta_star, theta_offset = sweep.shift_data(max_fitted_peak_p,
max_fitted_peak,
min_fitted_peak_p,
min_fitted_peak,
keep_shifted=do_shift)
m_x_set, mean_xs, means, _ = sweep.get_mean_data() # Also update here for file saving.
if do_shift:
file.lists_to_csv(sweep_save_dir + "shifted_data/",
f"shifted_data_sweep_{sweep.geometry_label}_Y{np.mean(sweep.Ys):.2f}.csv",
[sweep.xs, sweep.thetas],
headers=["x", "theta"])
file.lists_to_csv(sweep_save_dir + "shifted_mean_data/",
f"mean_sweep_{sweep.geometry_label}_Y{np.mean(sweep.Ys):.2f}.csv",
[mean_xs, means, y_errs],
headers=["mean x", "mean theta", "99% confidence interval"])
w_threshs = [5, 8, 15]
for w_thresh in w_threshs:
try:
pred_file_name = f"W{sweep.W:.2f}H{sweep.H:.2f}Y{np.mean(sweep.Ys):.2f}" \
f"_bem_slot_prediction_20000_0.25_{w_thresh}.csv"
pred_xs, pred_thetas = file.csv_to_lists(prediction_file_dir,
pred_file_name,
has_headers=True)
pred_max_theta, pred_max_x = sorted(zip(pred_thetas, pred_xs))[-1]
theta_star_pct_dif = 100 * (theta_star - pred_max_theta) / theta_star
theta_star_pct_difs.append(theta_star_pct_dif)
x_star_pct_dif = 100 * (x_star - pred_max_x) / x_star
x_star_pct_difs.append(x_star_pct_dif)
if verbose:
print(sweep.geometry_label, f"Y{np.mean(sweep.Ys):.2f}", "theta % difference =",
f"{theta_star_pct_dif:.2f}, x % difference = {x_star_pct_dif:.2f}")
break
except FileNotFoundError:
continue
m_x_set, mean_xs, means, _ = sweep.get_mean_data()
if abs(theta_offset) > 0.05: # Any larger than this would mean very noticeable tilt of the frame.
print(f"WARNING: Large jet angle offset detected on {sweep.geometry_label}:{sweep.m_y}."
f" Y={np.mean(sweep.Ys)}, theta_offset={theta_offset:.5f}")
is_bad_data = True
else:
x_offsets.append(x_offset)
theta_offsets.append(theta_offset)
Y = np.mean(sweep.Ys)
if verbose:
......@@ -567,7 +652,7 @@ def analyse_slot(ax, sweeps=None, set_y_label=True, set_x_label=True, use_defaul
ax.scatter(sweep.xs, sweep.thetas, marker=marker, color=c, zorder=zorder)
if verbose:
print(f"{sweep.geometry_label}:{sweep.m_y}, Mean Y = {Y:.3f}\n")
if len(sweeps) == len(prediction_files):
if prediction_files is not None and len(sweeps) == len(prediction_files):
label = None
if len(sweeps) == 1:
label = "Numerical"
......@@ -575,17 +660,23 @@ def analyse_slot(ax, sweeps=None, set_y_label=True, set_x_label=True, use_defaul
else:
n_c = c
plot_prediction_file(prediction_files[i], ax, normalize, x_star=x_star, theta_star=theta_star, label=label,
c=n_c)
c=n_c, verbose=verbose)
print(f"Number of rejected data sets = {num_rejected_sets}")
if len(theta_star_pct_difs) > 0 and len(x_star_pct_difs) > 0:
print(f"Mean % theta_star difference = {np.mean(theta_star_pct_difs):.2f} "
f"for {len(theta_star_pct_difs)} sweeps.")
print(f"Mean % x_star difference = {np.mean(x_star_pct_difs):.2f} for {len(x_star_pct_difs)} sweeps.")
print(f"Mean absolute theta offset = {np.mean(np.abs(theta_offsets)):.3f}")
print(f"Mean absolute x offset = {np.mean(np.abs(x_offsets)):.3f}")
#########################
# General plot features #
#########################
# Plot predictions.
if plot_predicted and len(sweeps) != len(prediction_files):
plot_prediction_files(prediction_files, ax, normalize, not colours)
plot_prediction_files(prediction_files, ax, normalize, not colours, verbose=verbose)
# Set axis labels.
if normalize:
......@@ -643,5 +734,5 @@ if __name__ == "__main__":
# exp_line = mlines.Line2D([], [], color="black", linestyle=" ", marker=".", label='Experimental')
# pred_line = mlines.Line2D([], [], color="C1", label='Numerical', linewidth=1)
# this_ax.legend(handles=[exp_line, pred_line], frameon=False, loc='lower left')
# plt.tight_layout()
plt.tight_layout()
plt.show()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment