Forked from
jp7g21 / flash-flood
Up to date with the upstream repository.
flood.py 13.82 KiB
from sympy import *
from matplotlib import pyplot as plt
from scipy import integrate as sp_int
from scipy import optimize
import numpy as np
"""
Plot a sympy expression f(x_sym) with respect to variable x_sym.
x_sym in range lim = [low, high].
N is the number of sample points.
"""
def plot_expr(ax, f, x_sym, lim, N):
F = lambdify(x_sym, f)
x = []
y = []
for i in range(N):
xi = i * (lim[1]-lim[0]) / (N-1) + lim[0]
x.append(xi)
y.append(F(xi))
ax.plot(x, y)
"""
Derive an analytical expression for the wetted perimeter of the river.
The width of the river at height y above the riverbed is given by w, which is
a sympy expression in terms of y. A_sym is a symbol representing the area filled
with water. The expression returned from this function will be in terms of A_sym.
"""
def derive_wetted_perim(w, y, A_sym, debug=False):
# Define symbols and functions
l = Function("l", positive=True, real=True)(A_sym)
h_sym = h = Symbol("h", positive=True, real=True)
# Calculate length by integrating segment lengths between 0 and h.
# Add the width at the base to the total length for the case of a flat bottom.
dx_dy = diff(w/2, y) # x = +- w/2
dl_dy = 2 * sqrt(dx_dy**2 + 1) # 2* for both left and right side of channel
l = integrate(dl_dy, (y, 0, h)) + w.subs(y, 0)
if debug:
print("l(h) =", l)
# Calculate channel area by integrating width wrt vertical coordinate.
A = simplify(integrate(w, (y, 0, h)))
if debug:
print("A(h) =", A)
# Get h in terms of A (there may be multiple solutions)
h_of_A_arr = solve(A_sym - A, h)
# For each solution...
for h_of_A in h_of_A_arr:
if debug:
print("================")
print("h(A) =")
pprint(h_of_A)
# Substitute the expression for h(A) into l(h) to get l(A)
l_of_A = simplify(l.subs(h, h_of_A))
if debug:
print("l(A) =")
pprint(l_of_A)
return l_of_A
"""
Returns the rate of change of c using Godunov method, for use in an ODE solver.
Q is a python function taking c (i.e. A in our case) as a parameter and calculating
the flux. delta is the distance timestep along the length of the river. c an array
containing the current conservation law parameter at each unit length. c_inflow is a
scalar giving the inflow at the left side of the river.
"""
def godunov_step(Q, delta, c, c_inflow):
N = len(c)
dc_dt = [-(Q(c[0]) - Q(c_inflow))/delta]
for i in range(1, N):
dc_dt.append(-(Q(c[i]) - Q(c[i-1]))/delta)
return dc_dt
"""
Solves a conservation problem using the Godunov method, returning a list-of-lists, where
each list is the value of c for all x-steps, at a different time.
Q is a python function taking c and calculating the flux. c_init is a list containing
the initial conservation variable at each length step. c_inflow is the value of c on
the left. delta is the x-step. t is a list giving the time values for which the ODE
solution should be given.
"""
def godunov_solve(Q, c_init, c_inflow, delta, t):
gstep = lambda c, t: godunov_step(Q, delta, c, c_inflow)
return sp_int.odeint(gstep, c_init, t)
"""
Division that can handle x/0
"""
def safe_div(a,b):
if abs(b) < 1e-9:
if b < 0:
return -a/1e-9
else:
return a/1e-9
else:
return a/b
"""
Takes a single timestep calculating the propagation of a shock, returning the rate of
change of the current state vector y.
y is the current parameter vector: [x, x0L, x0R], where x is the current x-position
of the shock, x0L is the initial (t=0) x-value of the characteristic meeting the
shock from the left, and x0R is the same but for the right side.
t is the current time.
vs is a python function giving the shock speed given the value
of c to the left and right of the shock (c_L and c_R). vc is a python function giving the
characteristic velocity given c. dvc_dc is a python function giving the rate of change of
vc with respect to c, for a given c. c_init is a python function giving the initial value
of c for a given x. dc_dx0 is a python function giving the rate of change of c_init with
respect to x.
"""
def shock_step(y, t, vs, vc, dvc_dc, c_init, dc_dx0):
x = y[0]
x0L = y[1]
x0R = y[2]
c_L = c_init(x0L)
c_R = c_init(x0R)
dx_dt = vs(c_L, c_R)
vc_L = vc(c_L)
vc_R = vc(c_R)
dcL_dx0 = dc_dx0(x0L)
dcR_dx0 = dc_dx0(x0R)
dx0L_dt = safe_div((vc_L - dx_dt), (-1 - t*dvc_dc(c_L)*dcL_dx0))
dx0R_dt = safe_div((vc_R - dx_dt), (-1 - t*dvc_dc(c_R)*dcR_dx0))
dcL_dt = dcL_dx0 * dx0L_dt
dcR_dt = dcR_dx0 * dx0R_dt
ret = [dx_dt, dx0L_dt, dx0R_dt]
#print(ret)
return ret
"""
Finds a solution to the shock position.
Q_expr is a sympy expression giving the flux in terms of c_sym.
c_init is a sympy expression giving the initial c in terms of x_sym.
x_shock is the x-coordinate of the start of the shock.
t_shock is the time of the start of the shock.
x0_shock is the x-coordinate whose characteristic leads into the start of the shock.
delta_x is the x simulation step and L is the total length.
delta_t is the t simulation step and T is the total time.
# (x0, t0) is the start of the shock
"""
def solve_shock(Q_expr, c_sym, c_init, x_sym, t_shock, x_shock, x0_shock,
delta_x, L, delta_t, T):
Q = lambdify(c_sym, Q_expr)
# Get function for char. velocity
vc_expr = diff(Q_expr, c_sym)
vc = lambdify(c_sym, vc_expr)
# Get function for shock speed (addition of 1e-9 prevents 0/0 error)
vs = lambda c_L,c_R: (Q(c_L) - Q(c_R) + vc(c_L)*1e-9) / (c_L - c_R + 1e-9)
dvc_dc = lambdify(c_sym, diff(vc_expr, c_sym))
dc_dx0 = lambdify(x_sym, diff(c_init, x_sym))
c_init_lamb = lambdify(x_sym, c_init)
t_eval = [t_shock+i*delta_t for i in range(int((T-t_shock)/delta_t))]
# Solve shock equation. Start with x=x0, x0L,x0R=x0_shock -+ epsillon
# x0L and x0R must start slightly off from x0_shock, otherwise they don't change.
sln = sp_int.solve_ivp(lambda t,y: shock_step(y, t, vs, vc,
dvc_dc, c_init_lamb, dc_dx0),
[t_shock, T],
[x_shock, x0_shock-1e-3, x0_shock+1e-3],
dense_output=True, atol=1e-3)
return sln.sol
"""
Get the global argmin of a function
"""
def get_argmin(f, x_sym, bounds):
f_lamb = lambdify(x_sym, f)
res = optimize.shgo(f_lamb, [bounds])
return res.x[0]
def get_all_argmin(f, x_sym, bounds):
f_lamb = lambdify(x_sym, f)
res = optimize.shgo(f_lamb, [bounds], iters=5,
sampling_method='sobol')
n = res.xl.shape[0]
minima = []
for i in range(n):
x = res.xl[i, 0]
if x > bounds[0]+1e-3 and x < bounds[1]-1e-3:
minima.append(x)
#print(minima)
return minima
def solve_characteristics(Q, c_sym, c_init, x_sym, delta_x, L, delta_t, T):
# Get expression for char. velocity
vc = diff(Q, c_sym)
#vc_diff = diff(vc, c_sym)
#plot_expr(vc_diff, c_sym, [0, 1000], 1000)
#exit(0)
# Derivative of vc(c_init(x)) wrt x
vc_diff = diff(vc, c_sym).subs(c_sym, c_init) * diff(c_init, x_sym)
# Find all the shock start points
x0_shock_arr = get_all_argmin(vc_diff, x_sym, (0, L))
shocks = []
print("Solving shocks... ", end="", flush=True)
# Solve each shock
for x0_shock in x0_shock_arr:
vc_shock = vc.subs(c_sym, c_init.subs(x_sym, x0_shock).evalf()).evalf()
t_shock = -1/(vc_diff.subs(x_sym, x0_shock).evalf())
x_shock = x0_shock + t_shock * vc_shock
shock_sln = solve_shock(Q, c_sym, c_init, x_sym, t_shock, x_shock, x0_shock,
delta_x, L, delta_t, T)
shocks.append({"t": t_shock, "x": x_shock, "x0": x0_shock, "sln": shock_sln})
print("done")
print(f"Shock starts at ({shocks[0]['x']}, {shocks[0]['t']})")
c_of_x0 = lambdify(x_sym, c_init)
vc_of_c = lambdify(c_sym, vc)
chars = []
print("Generating characteristics... [", end="", flush=True)
# Generate characteristics starting from uniform x-ordinates
ch_spacing = int(L/25)
for j in range(int(L/ch_spacing)): # For each characteristic
x0 = x = ch_spacing*j
# Empty lists to store x and t values
x_ch = [x0, x0]
t_ch = [0, 0]
# Value of c and vc along this characteristic
c_j = c_of_x0(x0)
vc_j = vc_of_c(c_j)
for i in range(int(T/delta_t)):
t = delta_t * i
x = x + vc_j*delta_t
hit_shock = False
for s in shocks:
# The shock solution is indexed in time from the start of the shock,
# so adjust the index accordingly
shock_idx = int(i - s["t"]/delta_t)
# If the char. crosses the shock, stop
if (t > s["t"]) and (
((x0 > s["x0"]-1e-1) and (x < s["sln"](t)[0]+1e-1)) or
((x0 < s["x0"]+1e-1) and (x > s["sln"](t)[0]-1e-1))):
hit_shock = True
break
if hit_shock:
break
x_ch[-1] = x
t_ch[-1] = t
# Store
chars.append({"x": x_ch, "t": t_ch})
if (int(20*len(chars)/int(L/ch_spacing)) >
int(20*(len(chars)-1)/int(L/ch_spacing))):
print("=", end="", flush=True)
print("] done")
return chars, shocks
def plot_shocks(shocks, delta_t, T, ax):
print("Plotting shocks... ", end="", flush=True)
# Plot the shock solutions
for s in shocks:
if s["t"] < T:
t_arr = [i*delta_t+s["t"] for i in range(int((T-s["t"])/delta_t))]
y = s["sln"](np.array(t_arr))[0,:] #[s["sln"](t)[0] for t in t_arr]
ax.plot(y, t_arr, color="blue")
print("done")
def plot_chars(chars, ax):
print("Plotting characteristics... ", end="", flush=True)
for ch in chars:
ax.plot(ch["x"], ch["t"], color="red")
print("done")
def gen_fig(prefix, g_val, alpha_val, f_val, w_expr, y_sym, a_sym, a_val,
delta_x, L, delta_t, T,
A_init, x_sym):
g, alpha, f = symbols("g,alpha,f")
# Width of channel wrt vertical coordinate (y)
y = y_sym #Symbol("y", positive=True, real=True)
a = Symbol("a", positive=True, real=True)
w = w_expr #a #sqrt(y/a)
plt.figure()
ax = plt.gca()
plot_expr(ax, A_init, x_sym, [0, L], int(L/delta_x))
plt.xlabel("Position (m)")
plt.ylabel("Initial Area (m^2)")
plt.savefig(prefix+"-init.pdf", bbox_inches="tight")
A_sym = Symbol("A", positive=True, real=True)
l_of_A = derive_wetted_perim(w, y, A_sym)
u = simplify(sqrt(A_sym*g*sin(alpha) / (f * l_of_A)))
Q = (A_sym*u).subs([("g", g_val), ("alpha", alpha_val), ("f", f_val), (a, a_val)])
chars, shocks = solve_characteristics(Q, A_sym, A_init,
x_sym, delta_x, L, delta_t, T)
plt.figure()
ax0 = plt.gca()
plot_chars(chars, ax0)
plot_shocks(shocks, delta_t, T, ax0)
ax0.axis([0, L, 0, T])
plt.xlabel("Position (m)")
plt.ylabel("Time (s)")
plt.savefig(prefix+"-shock.pdf", bbox_inches="tight")
plt.figure()
ax1 = plt.gca()
#plot_expr(diff(Q, A_sym), A_sym, [0,30], 1000)
#return
Q_lamb = lambdify(A_sym, Q)
x = [i*delta_x for i in range(int(L/delta_x))]
A_inflow = float(A_init.evalf(subs={x_sym: 0}))
A_init_lamb = lambdify(x_sym, A_init)
A_init_data = [A_init_lamb(xi) for xi in x]
t = [i*delta_t for i in range(int(T/delta_t))]
print("Solving Godunov... ", end="", flush=True)
c_sln = godunov_solve(Q_lamb, A_init_data, A_inflow, delta_x, t)
print("done")
n_xticks = 6
n_tticks = 6
x_ticks = [int(i*L/delta_x/(n_xticks-1)) for i in range(n_xticks)]
x_tick_labels = [delta_x*i for i in x_ticks]
t_ticks = [int(i*T/delta_t/(n_tticks-1)) for i in range(n_tticks)]
t_tick_labels = [delta_t*i for i in t_ticks]
print("Plotting Godunov... ", end="", flush=True)
plt.colorbar(plt.pcolor(c_sln))
ax1.imshow(c_sln, origin="lower", aspect="auto")
ax1.set_xticks(x_ticks, x_tick_labels)
ax1.set_yticks(t_ticks, t_tick_labels)
plt.xlabel("Position (m)")
plt.ylabel("Time (s)")
plt.savefig(prefix+"-godunov.pdf", bbox_inches="tight")
print("done")
def main():
y_sym = Symbol("y", positive=True, real=True)
x_sym = Symbol("x", positive=True, real=True)
a_sym = Symbol("a", positive=True, real=True)
gen_A_init = lambda A0,A1,s_slope,s_grad: (A0+A1)/2 - (A0-A1)/2*tanh((x_sym-s_slope)*s_grad)
plot_defaults = {
"g_val": 9.81, "alpha_val": 0.01, "f_val": 0.05, "w_expr": a_sym,
"y_sym": y_sym, "a_sym": a_sym, "a_val": 50,
"delta_x": 1, "L": 300, "delta_t": 1, "T": 200,
"A_init": gen_A_init(20,1,100,1/50),
"x_sym": x_sym
}
plots = [
#plot_defaults | { "prefix": "fig/baseline" },
#plot_defaults | { "prefix": "alpha-0.02", "alpha_val": 0.02 },
#plot_defaults | { "prefix": "f-0.1", "f_val": 0.1 },
#plot_defaults | { "prefix": "dx_0.25,dt_0.25", "delta_x": 0.25, "delta_t": 0.25 },
#plot_defaults | { "prefix": "fig/a-5", "a_val": 5 },
#plot_defaults | { "prefix": "fig/a-3", "a_val": 3, "L": 700, "T": 400 },
#plot_defaults | { "prefix": "fig/a-10", "a_val": 10 },
#plot_defaults | { "prefix": "fig/a-1", "a_val": 1 },
#plot_defaults | { "prefix": "fig/slope-0.1", "A_init": gen_A_init(20,1,100,0.1) },
#plot_defaults | { "prefix": "fig/A-pulse", "A_init": 1 + 19*1000/(1*1000+(x_sym-100)**2) },
plot_defaults | { "prefix": "fig/A-pulse-short", "A_init": 1 + 19*10/(1*10+(x_sym-100)**2) }
]
plt.rcParams.update({'font.size': 22})
for p in plots:
gen_fig(**p)
main()