Source code for eml_urban_hydro_model.model

import numpy as np
import pandera as pa
from pandera.typing import DataFrame

from .rho import rho
from .type_models.model_input import ModelInput
from .type_models.model_parameters import ModelParameters
from .type_models.model_output import ModelOutput
from .type_models.rho_parameters import RhoParameters


[docs] @pa.check_types def model_st(df: DataFrame[ModelInput], params: ModelParameters) -> DataFrame[ModelOutput]: Qirr = params.Qirr if params.Qirr is not None else (params.area_params.omegaSoil * params.E_max / (100 * 86400)) rain = df["precp"].copy().fillna(0) rain_lag = rain if params.lag == 0 else np.concatenate((np.zeros(params.lag), rain.iloc[: -params.lag])) Kc = df["time"].dt.month.apply(lambda x: 1 if x in [12, 1, 2, 9, 10, 11] else 2) # Soil parameters Ks = params.soil_params.Ks n = params.soil_params.n beta = params.soil_params.beta s_h = params.soil_params.sh / params.soil_params.sfc s_w = params.soil_params.sw / params.soil_params.sfc s_s = params.soil_params.ss / params.soil_params.sfc s_fc = params.soil_params.sfc / params.soil_params.sfc # Initialization y0 = [0, s_s, 0, 0] y_result = np.zeros( (df.shape[0], 4) ) # increased to 4 for roof vector (index: 0 for Road, 1 for Soil, 2 for Roof, 3 for Tank) y_result[0] = y0 dydt = np.zeros(4) # increased to 4 for roof vector Qsoil = np.zeros(df.shape[0]) Qpolicy_irr = np.zeros(df.shape[0]) Vpolicy_flush = np.zeros(df.shape[0]) Qout = np.zeros(df.shape[0]) Qout_raw = np.zeros(df.shape[0]) Qroad = np.zeros(df.shape[0]) Qroof = np.zeros(df.shape[0]) Qtank = np.zeros(df.shape[0]) Vroad = np.zeros(df.shape[0]) Vroof = np.zeros(df.shape[0]) # Vtank = np.zeros(df.shape[0]) # Not used steps_per_hour = 60 / params.resolution # Number of time steps per hour lam = params.flushing_frequency / steps_per_hour # Poisson mean for i in range(df.shape[0] - 1): p = rain_lag.iloc[i] / (1000 * 60 * params.resolution) # Convert from mm/5min to m/s pm = p # Modeled precipitation # Sample from a Poisson distribution (flush count per time step) flushing = np.random.poisson(lam) # ============ Qsoil ===============# if y_result[i, 1] >= s_fc: Qsoil[i] = p * params.area_params.omegaSoil elif p > params.heavy: Qsoil[i] = (p - params.heavy) * params.area_params.omegaSoil else: Qsoil[i] = 0 p_eff = ( p * params.area_params.omegaSoil - Qsoil[i] ) / params.area_params.omegaSoil # Precipitation that goes into the soil # ============ Qroad ===============# Qroad[i] = ( params.k * y_result[i, 0] * params.area_params.omegaRoad / 86400 ) # Water flow at time point i the k should be different in roads and roofs Vroad[i] = y_result[i, 0] * params.area_params.omegaRoad # Water volume of Road reservoir at time point i # Qroad[i] = k * V_runoff[i] # Turn k [/day] to k [/s] # ============ Qroof ===============# Vroof[i] = y_result[i, 2] * (1 - params.frac_rt2tk) * params.area_params.omegaRoof Qroof[i] = ( params.k * y_result[i, 2] * (1 - params.frac_rt2tk) * params.area_params.omegaRoof / 86400 ) # Water flow at time point i # ==============Road reservoir==========# dydt[0] = pm - Qroad[i] / params.area_params.omegaRoad # roof, tank and roads use pm (modeled precipitation) y_result[i + 1, 0] = y_result[i, 0] + dydt[0] * 60 * params.resolution if y_result[i + 1, 0] < 0: y_result[i + 1, 0] = 0 # ==============Tank reservoir==========# Qpolicy_irr[i] = ( Kc.iloc[i] * Qirr if ( pm == 0 and y_result[i, 1] < s_fc - 0.1 and y_result[i, 3] >= Kc.iloc[i] * Qirr * params.resolution * 60 ) else 0 ) Vpolicy_flush[i] = ( flushing * 0.02 if (y_result[i, 3] > 0.02 * flushing) else y_result[i, 3] ) # multiply by random 0 or 1 if params.frac_rt2tk != 0: # Calculate inflow and outflows dydt[3] = pm * (params.frac_rt2tk * params.area_params.omegaRoof) - (Qtank[i] + Qpolicy_irr[i]) y_result[i + 1, 3] = y_result[i, 3] - Vpolicy_flush[i] + dydt[3] * 60 * params.resolution # Ensure tank volume is non-negative if y_result[i + 1, 3] < 0: y_result[i + 1, 3] = 0 # Check for overflow if y_result[i + 1, 3] >= params.Vmax: Qtank[i] = (y_result[i + 1, 3] - params.Vmax) / (60 * params.resolution) # Overflow rate y_result[i + 1, 3] = params.Vmax else: Qtank[i] = 0 else: dydt[3] = 0 y_result[:, 3] = 0 # ==============Roof reservoir==========# dydt[2] = ( pm - Qroof[i] / ((1 - params.frac_rt2tk) * params.area_params.omegaRoof) if params.frac_rt2tk != 1 else 0 ) # roof, tank and roads use pm (modeled precipitation) y_result[i + 1, 2] = y_result[i, 2] + dydt[2] * 60 * params.resolution if y_result[i + 1, 2] < 0: y_result[i + 1, 2] = 0 # ==============Soil reservoir==========# Z_soil = n * params.vegetation_params.Z_root rho_params = RhoParameters(n=n, model_params=params, beta=beta, s_h=s_h, s_w=s_w, s_s=s_s, s_fc=s_fc, Ks=Ks) dydt[1] = (p_eff + Qpolicy_irr[i] / params.area_params.omegaSoil) / (Z_soil / 100) - rho( y_result[i, 1], rho_params ) / 86400 # (1+(1-frac_rt2tk)*frac_rt2s*omegat/omegas) y_result[i + 1, 1] = y_result[i, 1] + dydt[1] * 60 * params.resolution if y_result[i + 1, 1] >= 1: y_result[i + 1, 1] = 1 # ============ Qout ===============# Qout_raw[i] = Qroad[i] + Qroof[i] + Qtank[i] + Qsoil[i] # Total outflow at time point i # Weighed runoff coefficient (based on area proportions) total_area = params.area_params.omegaSoil + params.area_params.omegaRoad + params.area_params.omegaRoof weighted_runoff_coeff = ( (params.area_params.omegaSoil / total_area) * params.area_params.runoff_coeff_soil + (params.area_params.omegaRoad / total_area) * params.area_params.runoff_coeff_roads + (params.area_params.omegaRoof / total_area) * params.area_params.runoff_coeff_roofs ) # Apply weighted runoff coefficient to Qout_raw Qout = Qout_raw * weighted_runoff_coeff df_return = df[["time", "precp"]].copy() df_return["Qout"] = Qout df_return["Qout_raw"] = Qout_raw df_return["Qroad"] = Qroad df_return["Qroof"] = Qroof df_return["Qsoil"] = Qsoil df_return["Qtank"] = Qtank df_return["Qirr"] = Qpolicy_irr df_return["Vflush"] = Vpolicy_flush df_return["soil_mois"] = y_result[:, 1] df_return["v_tank"] = y_result[:, 3] return df_return