Commit a808572c authored by Elijah Andrews's avatar Elijah Andrews

Added supporting code for slot geometries paper.

parent 23748f0b
import math
import numpy as np
import scipy.linalg
import util.gen_utils as gen
from util.vector_utils import mag
def get_vel(point, centroids, areas, source_densities, bubble_pos=None, m_0=1):
"""
Calculate the fluid velocity at a point.
:param point: 3D point, [x, y, z]
:param centroids:
:param areas:
:param source_densities:
:param bubble_pos: Bubble position vector, if None bubble is not included.
:param m_0:
:return:
"""
s_mat = np.array(source_densities)
if len(s_mat.shape) == 1:
s_mat = np.expand_dims(s_mat, 1)
ps, qs = np.array(centroids), np.array(point)
a_mat = np.expand_dims(areas, 1)
dif_mat = np.subtract(ps, qs, dtype=np.float32)
r_mat = np.expand_dims(np.linalg.norm(dif_mat, axis=1), 1)
res_mat = np.divide(s_mat * a_mat * dif_mat, (4 * np.pi * r_mat ** 3), where=r_mat != 0)
boundary_vel_sum = np.sum(res_mat, 0)
if bubble_pos is None:
vel = boundary_vel_sum
else:
point = np.array(point)
bubble_pos = np.array(bubble_pos)
bubble_vel = -(point - bubble_pos) * m_0 / (
4 * np.pi * ((point[0] - bubble_pos[0]) ** 2
+ (point[1] - bubble_pos[1]) ** 2
+ (point[2] - bubble_pos[2]) ** 2) ** (3 / 2))
vel = boundary_vel_sum + bubble_vel
return vel
# @profile
def get_R_matrix(centroids, normals, areas, dtype=np.float32):
# Expand centroids into two n x n matrices.
ps, qs = np.broadcast_arrays(np.expand_dims(centroids, 1), np.expand_dims(centroids, 0))
# Expand normals into an n x n x 3 matrix.
n_mat = np.broadcast_to(np.expand_dims(normals, 1), (len(ps), len(qs), 3))
# Expand areas into an n x n matrix.
a_mat = np.broadcast_to(np.expand_dims(areas, 0), (len(ps), len(qs)))
# p - q
res_mat = np.subtract(ps, qs, dtype=dtype) # dif_mat
# |p - q| , equals zero if p = q
r_mat = np.linalg.norm(res_mat, axis=2)
# n . (p - q)
res_mat = np.einsum('...k,...k', n_mat, res_mat) # n_dot_dif
# a_q * (n_p . (p - q)) / (4 * pi * |p - q| ^ 3)
res_mat = np.divide(a_mat * res_mat, (4 * np.pi * r_mat ** 3), where=r_mat != 0, dtype=dtype)
# Set diagonal to value obtained from integrating over the singularity in a panel (the effect on itself).
np.fill_diagonal(res_mat, 0.5)
return res_mat
def get_R_factor(arr_p, arr_q):
"""
:param arr_p: [p, (centroid, normal)]
:param arr_q: [q, (centroid, area)]
:return: R_factor
"""
p = arr_p[0]
p_cent = arr_p[1][0]
p_norm = arr_p[1][1]
q = arr_q[0]
q_cent = arr_q[1][0]
q_area = arr_q[1][1]
if p != q:
r = p_cent - q_cent
return q_area * np.dot(p_norm, r) / (4 * np.pi * mag(r) ** 3)
# return 0
else:
return 0.5
def get_R_vector(point, centroids, normals):
ps, qs = np.broadcast_arrays(np.expand_dims(centroids, 1), np.expand_dims(point, 0))
n_mat = np.broadcast_to(np.expand_dims(normals, 1), (len(centroids), 1, 3))
dif_mat = np.subtract(ps, qs, dtype=np.float64)
n_dot_dif = np.einsum('...k,...k', n_mat, dif_mat)
r_mat = np.linalg.norm(dif_mat, axis=2)
res_mat = np.divide(n_dot_dif, (4 * np.pi * r_mat ** 3), where=r_mat != 0)
return res_mat
def calculate_sigma(bubble_pos, centroids, normals, areas, m_0, R_inv=None, R_b=None) -> np.ndarray:
if R_b is None:
R_b = get_R_vector(bubble_pos, centroids, normals)
if R_inv is None:
R = get_R_matrix(centroids, normals, areas)
# sigma = gauss_seidel(R, -m_0 * R_b, max_res=1e-12)
sigma = scipy.linalg.solve(R, -m_0 * R_b) # Faster to do this than inverting the matrix
# sigma = svd_solve(R, -m_0 * R_b)
else:
sigma = -m_0 * np.dot(R_inv, R_b)
return sigma
def get_jet_dir_and_sigma(bubble_pos, centroids, normals, areas, m_0=1, R_inv=None, R_b=None) \
-> [np.ndarray, np.ndarray]:
sigma = calculate_sigma(bubble_pos, centroids, normals, areas, m_0, R_inv, R_b)
vel = get_vel(bubble_pos, centroids, areas, sigma)
return vel, sigma
def get_jet_dirs(bubble_pos_list, centroids, normals, areas, m_0=1, R_inv=None, R_b=None, verbose=False):
vels = np.empty((len(bubble_pos_list), 3))
for i, pos in enumerate(bubble_pos_list):
if verbose:
print(f"{100 * i / len(bubble_pos_list):.2f}% complete...")
vels[i] = get_jet_dir_and_sigma(pos, centroids, normals, areas, m_0=m_0, R_inv=R_inv, R_b=R_b)[0]
return vels
def test_run_analysis():
centroids, normals, areas = gen.gen_slot(500)
bubble = np.array([0, 1, 0])
m_0 = 1
R_b = get_R_vector(bubble, centroids, normals)
R_mat = get_R_matrix(centroids, normals, areas)
R_inv = np.linalg.inv(R_mat)
source_densities = calculate_sigma(bubble, centroids, normals, areas, m_0, R_inv, R_b)
res_vel = get_vel(bubble, centroids, areas, source_densities, m_0=m_0)
print("Resultant velocity = ", res_vel)
assert (np.all([math.isclose(res_vel[0], 0, abs_tol=1e-16), math.isclose(res_vel[2], 0, abs_tol=1e-16)]))
assert (res_vel[1] < 0)
import numpy as np
import math
import util.gen_utils as gen
import bem as bem
import matplotlib.pyplot as plt
from util.plotting_utils import plot_3d_point_sets
import os
import util.file_utils as file
import scipy.sparse
W = 2
H = 3
Ys = [2]
x_lim = 8
Xs = np.linspace(-x_lim * W / 2, x_lim * W / 2, 100)
save_to_file = False
calculate_error = False
normalise = False
show_plot = True
if not (save_to_file or show_plot):
raise ValueError("No output selected for model.")
m_b = 1
n = 5000
density_ratio = 0.25
w_thresh = 10
out_dir = "../model_outputs/exp_comparisons/"
if not os.path.exists(out_dir) and save_to_file:
os.makedirs(out_dir)
centroids, normals, areas = gen.gen_varied_slot(n=n, H=H, W=W, length=100, depth=50, w_thresh=w_thresh,
density_ratio=density_ratio)
print("Requested n = {0}, using n = {1}.".format(n, len(centroids)))
plot_3d_point_sets([centroids])
R_matrix = bem.get_R_matrix(centroids, normals, areas, dtype=np.float32)
R_inv = scipy.linalg.inv(R_matrix)
condition_number_1 = np.linalg.norm(R_inv, 1) * np.linalg.norm(R_matrix, 1)
condition_number_inf = np.linalg.norm(R_inv, np.inf) * np.linalg.norm(R_matrix, np.inf)
print(f"Condition numbers: 1 norm = {condition_number_1}, inf norm = {condition_number_inf}")
for Y in Ys:
if calculate_error:
thetas = []
for X in Xs:
print("Testing X =", X)
R_b = bem.get_R_vector([X, Y, 0], centroids, normals)
res_vel, sigma = bem.get_jet_dir_and_sigma([X, Y, 0], centroids, normals, areas, m_0=m_b, R_inv=R_inv,
R_b=R_b)
theta = math.atan2(res_vel[1], res_vel[0]) + math.pi / 2
thetas.append(theta)
print(" theta =", theta)
residual = np.abs(m_b * R_b + np.dot(R_matrix, sigma))
max_err = condition_number_inf * np.linalg.norm(residual, np.inf) / np.linalg.norm(m_b * R_b, np.inf)
print(f" Max res = {np.max(residual):.3e}, Mean res = {np.mean(residual):.3e},"
f" Max err = {max_err:.3e}")
else:
points = np.empty((len(Xs), 3))
points[:, 0] = Xs
points[:, 1] = Y
points[:, 2] = 0
vels = bem.get_jet_dirs(points, centroids, normals, areas, m_b, R_inv, verbose=True)
thetas = np.arctan2(vels[:, 1], vels[:, 0]) + 0.5 * np.pi
xs = Xs / (0.5 * W)
if normalise:
theta_star, x_star = sorted(zip(thetas, xs))[-1]
xs = xs / x_star
thetas = thetas / theta_star
if show_plot:
fig = plt.figure()
fig.patch.set_facecolor('white')
ax = plt.gca()
ax.plot(xs, thetas)
ax.set_xlabel("$x$")
ax.set_ylabel("$\\theta$ (rad)")
ax.axvline(x=-1, linestyle='--', color='gray')
ax.axvline(x=1, linestyle='--', color='gray')
plt.show()
if save_to_file:
file_path = f"W{W:.2f}H{H:.2f}Y{Y:.2f}_bem_slot_prediction_{n}_{density_ratio}_{w_thresh}.csv" \
if not normalise \
else f"W{W:.2f}H{H:.2f}Y{Y:.2f}_bem_slot_prediction_{n}_{density_ratio}_{w_thresh}_normalised.csv"
alph = "abcdefgh"
i = 0
while os.path.exists(out_dir + file_path):
file_path = f"W{W:.2f}H{H:.2f}Y{Y:.2f}_bem_slot_prediction_{n}_{density_ratio}_{w_thresh}" \
f"{alph[i]}.csv"
i += 1
headers = ["x", "theta"] if not normalise else ["x_hat", "theta_hat"]
file.lists_to_csv(out_dir, file_path, [xs, thetas], headers=headers)
import os
import numpy as np
from PyQt5.QtWidgets import QFileDialog, QApplication
def lists_to_csv(file_dir, filename, lists, headers=None, overwrite=False):
"""
Writes a list of lists to a CSV file where each list is a column
:param file_dir: The directory in which to save the CSV file.
:param filename: The name of the CSV file (including file extension).
:param lists: The lists to save.
:param headers: A list of header strings, one for each column, defaults to no headers.
:param overwrite: Whether to overwrite an existing file, defaults to false.
:return: Success of file write.
"""
if os.path.exists(file_dir + filename) and not overwrite:
print(f"Warning: file already exists. {file_dir + filename}")
return False
if file_dir != "":
os.makedirs(file_dir, exist_ok=True)
file = open(file_dir + filename, "w")
if headers:
line = ""
for h in headers:
line += h + ","
line = line[:-1] + "\n"
file.write(line)
zipped_lists = np.transpose(lists)
for entry in zipped_lists:
line = ""
for part in entry:
line += str(part) + ","
line = line[:-1] + "\n"
file.write(line)
file.close()
return True
def csv_to_lists(file_dir, filename, has_headers=False):
"""
Reads a CSV file to a list of lists where each column becomes a list.
:param file_dir: The directory from which to read the CSV file.
:param filename: The name of the CSV file (including file extension).
:param has_headers: Whether the CSV file has headers, defaults to False.
:return: List of lists.
"""
file = open(file_dir + filename, "r")
lines = file.readlines()
start_idx = 1 if has_headers else 0
zipped_lists = []
for line in lines[start_idx:]:
line = line.strip(',\n')
part_arr = []
for part in line.split(","):
part_arr.append(float(part))
zipped_lists.append(part_arr)
return np.transpose(zipped_lists)
def select_file(start_path, create_window=True):
"""
Opens a file selection dialog.
:param start_path: The path at which to open the dialog.
:param create_window: Whether to create a new QApplication. May need to be set to false if QT is used elsewhere,
such as in matplotlib.
:return: String of the full file path.
"""
if create_window:
window = QApplication([])
file_path = str(QFileDialog.getOpenFileName(None, "Select File", start_path)[0])
return file_path
def select_multiple_files(start_path, create_window=True):
"""
Opens a multiple file selection dialog.
:param start_path: The path at which to open the dialog.
:param create_window: Whether to create a new QApplication. May need to be set to false if QT is used elsewhere,
such as in matplotlib.
:return: A list of full file path strings.
"""
if create_window:
window = QApplication([])
file_paths = QFileDialog.getOpenFileNames(None, "Select File", start_path)[0]
return file_paths
def select_dir(start_path, create_window=True):
"""
Opens a directory selection dialog.
:param start_path: The path at which to open the dialog.
:param create_window: Whether to create a new QApplication. May need to be set to false if QT is used elsewhere,
such as in matplotlib.
:return: A list of full file path strings.
"""
if create_window:
window = QApplication([])
dir_path = str(QFileDialog.getExistingDirectory(None, "Select Directory", start_path)) + "/"
return dir_path
This diff is collapsed.
import matplotlib as mpl
import matplotlib.cm as cm
import matplotlib.pyplot as plt
from matplotlib.colors import LinearSegmentedColormap
from mpl_toolkits.mplot3d import Axes3D
import numpy as np
def plot_frame(frame, pos=None, show_immediately=True):
plt.figure()
plt.imshow(frame, cmap=plt.cm.gray)
if pos is not None:
plt.plot(pos[0], pos[1], 'rx')
plt.xticks([])
plt.yticks([])
if show_immediately:
plt.show()
def initialize_plt(font_size=10, line_scale=1, capsize=3):
plt.rc('text', usetex=True)
font = {'family': 'serif', 'size': font_size, 'serif': ['cmr10']}
plt.rc('font', **font)
plt.rc('lines', linewidth=line_scale, markersize=3 * line_scale)
plt.rc('axes', linewidth=0.5)
plt.rc('patch', linewidth=0.5 * line_scale)
plt.rc('errorbar', capsize=capsize)
color_dict = {'red': ((0.0, 0.0, 0.0),
(0.9, 0.5, 1.0),
(1.0, 1.0, 1.0)),
'green': ((0.0, 0.0, 0.0),
(0.5, 0.0, 0.0),
(1.0, 0.0, 0.0)),
'blue': ((0.0, 0.0, 0.0),
(0.5, 0.0, 0.0),
(1.0, 0.0, 0.0))}
heatmap_cm = LinearSegmentedColormap('heatmap', color_dict)
def set_axes_radius(ax, origin, radius):
""" https://stackoverflow.com/a/50664367/5270376 """
ax.set_xlim3d([origin[0] - radius, origin[0] + radius])
ax.set_ylim3d([origin[1] - radius, origin[1] + radius])
ax.set_zlim3d([origin[2] - radius, origin[2] + radius])
def set_axes_equal(ax):
"""
https://stackoverflow.com/a/50664367/5270376
Make axes of 3D plot have equal scale so that spheres appear as spheres,
cubes as cubes, etc.. This is one possible solution to Matplotlib's
ax.set_aspect('equal') and ax.axis('equal') not working for 3D.
Input
ax: a matplotlib axis, e.g., as output from plt.gca().
"""
limits = np.array([
ax.get_xlim3d(),
ax.get_ylim3d(),
ax.get_zlim3d(),
])
origin = np.mean(limits, axis=1)
radius = 0.5 * np.max(np.abs(limits[:, 1] - limits[:, 0]))
set_axes_radius(ax, origin, radius)
def plot_3d_points(points, c=None, cmap_name="RdBu", center_cmap=True):
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d') # type: Axes3D
points = np.array(points)
if c is not None:
if center_cmap:
scat = ax.scatter3D(points[:, 0], points[:, 2], points[:, 1], c=c, cmap=cm.get_cmap(cmap_name),
vmin=-np.max(np.abs(c)), vmax=np.max(np.abs(c)))
else:
scat = ax.scatter3D(points[:, 0], points[:, 2], points[:, 1], c=c, cmap=cm.get_cmap(cmap_name))
fig.colorbar(scat)
else:
ax.scatter3D(points[:, 0], points[:, 2], points[:, 1])
ax.set_xlabel('X')
ax.set_ylabel('Z')
ax.set_zlabel('Y')
lim = [min([ax.get_xlim()[0], ax.get_ylim()[0], ax.get_zlim()[0]]),
max([ax.get_xlim()[1], ax.get_ylim()[1], ax.get_zlim()[1]])]
ax.set_xlim(lim)
ax.set_ylim(lim)
ax.set_zlim(lim)
plt.show()
def plot_2d_point_sets(point_sets):
colors = ["r", "b", "g", "orange", "k", "yellow"]
fig = plt.figure()
ax = fig.add_subplot(111, aspect='equal')
for i in range(len(point_sets)):
points = np.array(point_sets[i])
ax.scatter(points[:, 0], points[:, 1], c=colors[i])
ax.set_xlabel('X')
ax.set_ylabel('Y')
lim = [min([ax.get_xlim()[0], ax.get_ylim()[0]]),
max([ax.get_xlim()[1], ax.get_ylim()[1]])]
ax.set_xlim(lim)
ax.set_ylim(lim)
plt.show()
def plot_3d_point_sets(point_sets):
colors = ["r", "b", "g", "orange", "k", "yellow"]
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d') # type: Axes3D
for i in range(len(point_sets)):
points = np.array(point_sets[i])
ax.scatter3D(points[:, 0], points[:, 2], points[:, 1], c=colors[i])
ax.set_xlabel('X')
ax.set_ylabel('Z')
ax.set_zlabel('Y')
# lim = [min([ax.get_xlim()[0], ax.get_ylim()[0], ax.get_zlim()[0]]),
# max([ax.get_xlim()[1], ax.get_ylim()[1], ax.get_zlim()[1]])]
# ax.set_xlim(lim)
# ax.set_ylim(lim)
# ax.set_zlim(lim)
set_axes_equal(ax)
plt.show()
def plot_heatmap(points, x_bins=100, mode='smooth', filename=None, cmap_name='viridis'):
"""
Generate a heatmap plot of a number of points.
:param points: The points to use in the heatmap.
:param x_bins: The number of bins along the x-axis.
:param mode: 'smooth' for smooth bin edges, 'sharp' for sharp bin edges.
:param filename: The filename (and path) to save the plot, does not save if None.
"""
points = np.array(points)
x = points[:, 0]
y = points[:, 1]
x_dim = abs(np.ptp(x))
y_dim = abs(np.ptp(y))
# Generate heatmap
y_bins = int(round(x_bins * y_dim / x_dim))
if y_bins == 0:
y_bins = 1
heatmap, xedges, yedges = np.histogram2d(x, y, bins=(x_bins, y_bins))
extent = [xedges[0] - 1, xedges[-1] + 1, yedges[0] - 1, yedges[-1] + 1]
heatmap = heatmap.T
fig = plt.figure()
if abs(yedges[-1] - yedges[0]) < 0.025:
yedges[0] = -0.025 # Ensure that the data is actually visible.
if mode == 'smooth':
ax = fig.add_subplot(111, aspect='equal', xlim=xedges[[0, -1]], ylim=yedges[[0, -1]])
im = mpl.image.NonUniformImage(ax, interpolation='bilinear')
xcenters = (xedges[:-1] + xedges[1:]) / 2
ycenters = (yedges[:-1] + yedges[1:]) / 2
im.set_data(xcenters, ycenters, heatmap)
ax.images.append(im)
elif mode == 'sharp':
ax = fig.add_subplot(111, aspect='equal')
X, Y = np.meshgrid(xedges, yedges)
cmap = None
if cmap_name == "heatmap":
cmap = heatmap_cm
else:
cmap = cm.get_cmap(cmap_name)
ax.pcolormesh(X, Y, heatmap, cmap=cmap)
else:
raise (ValueError("{0} is not a valid mode.".format(mode)))
if filename is not None:
plt.savefig(filename)
plt.show()
if __name__ == "__main__":
this_points = [[0, 1], [0, 2], [2, 3], [1, 3.5]]
plot_heatmap(this_points, x_bins=5, mode='sharp')
def mag(v):
if len(v) == 2:
return (v[0] ** 2 + v[1] ** 2) ** 0.5
if len(v) == 3:
return (v[0] ** 2 + v[1] ** 2 + v[2] ** 2) ** 0.5
def unit(v):
m = mag(v)
if len(v) == 2:
return [v[0] / m, v[1] / m]