Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pinnicle/physics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from .continuity import *
from .dummy import *
from .stressbalance import *
from .stressbalance_MC import *
from .iceshelf import *
from .timeinvariant import *
from .friction import *
6 changes: 6 additions & 0 deletions pinnicle/physics/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,9 @@ def __init__(self, **kwargs):
self.variable_ub['u_base'] = 1.0e4/self.yts
self.variable_lb['v_base'] = -1.0e4/self.yts
self.variable_ub['v_base'] = 1.0e4/self.yts
self.variable_lb['D_dH'] = -1e3
self.variable_ub['D_dH'] = 1e3
self.variable_lb['D_smb'] = -1e3
self.variable_ub['D_smb'] = 1e3
self.variable_lb['R'] = -1e3
self.variable_ub['R'] = 1e3
192 changes: 192 additions & 0 deletions pinnicle/physics/continuity.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,80 @@ def _pde_jax(self, nn_input_var, nn_output_var): #{{{
return self._pde(nn_input_var, nn_output_var) #}}}
#}}}
#}}}

# Steady-state mass conservation (a=0) {{{
class MCsteadyEquationParameter(EquationParameter, Constants):
""" default parameters for mass conservation
"""
_EQUATION_TYPE = 'MC_steady'
def __init__(self, param_dict={}):
# load necessary constants
Constants.__init__(self)
super().__init__(param_dict)

def set_default(self):
self.input = ['x', 'y']
self.output = ['u', 'v', 'H']
self.output_lb = [self.variable_lb[k] for k in self.output]
self.output_ub = [self.variable_ub[k] for k in self.output]
self.data_weights = [1.0e-8*self.yts**2, 1.0e-8*self.yts**2, 1.0e-6]
self.residuals = ["f"+self._EQUATION_TYPE]
self.pde_weights = [1.0e10]

# scalar variables: name:value
self.scalar_variables = {}
class MC_steady(EquationBase): #{{{
""" MC on 2D problem with negligible smb and dH/dt
"""
_EQUATION_TYPE = 'MC_steady'
def __init__(self, parameters=MCEquationParameter()):
super().__init__(parameters)

def _pde(self, nn_input_var, nn_output_var): #{{{
""" residual of MC 2D PDE

Args:
nn_input_var: global input to the nn
nn_output_var: global output from the nn
"""
# get the ids
xid = self.local_input_var["x"]
yid = self.local_input_var["y"]

uid = self.local_output_var["u"]
vid = self.local_output_var["v"]
Hid = self.local_output_var["H"]

# unpacking normalized output
u = slice_column(nn_output_var, uid)
v = slice_column(nn_output_var, vid)
H = slice_column(nn_output_var, Hid)

# spatial derivatives
u_x = jacobian(nn_output_var, nn_input_var, i=uid, j=xid)
H_x = jacobian(nn_output_var, nn_input_var, i=Hid, j=xid)
v_y = jacobian(nn_output_var, nn_input_var, i=vid, j=yid)
H_y = jacobian(nn_output_var, nn_input_var, i=Hid, j=yid)

# residual
f = H*u_x + H_x*u + H*v_y + H_y*v

return [f] #}}}
def _pde_jax(self, nn_input_var, nn_output_var): #{{{
""" residual of MC 2D PDE, jax version

Args:
nn_input_var: global input to the nn
nn_output_var: global output from the nn
"""
return self._pde(nn_input_var, nn_output_var) #}}}
#}}}
#}}}





# time dependent mass conservation {{{
class ThicknessEquationParameter(EquationParameter, Constants):
""" default parameters for mass conservation
Expand Down Expand Up @@ -147,3 +221,121 @@ def _pde_jax(self, nn_input_var, nn_output_var): #{{{
return self._pde(nn_input_var, nn_output_var) #}}}
#}}}
#}}}




# D-HNN exact mass conservation {{{
class MCexactEquationParameter(EquationParameter, Constants):
""" default parameters for mass conservation
"""
_EQUATION_TYPE = 'MC_exact'
def __init__(self, param_dict={}):
# load necessary constants
Constants.__init__(self)
super().__init__(param_dict)

def set_default(self):
self.input = ['x', 'y']
self.output = ['D_smb','D_dH','R', 'H']
self.output_lb = [self.variable_lb[k] for k in self.output]
self.output_ub = [self.variable_ub[k] for k in self.output]
self.data_weights = [1.0, 1.0, 1.0, 1.0e-6]
self.residuals = []
self.pde_weights = []

# scalar variables: name:value
self.scalar_variables = {}
class MC_exact(EquationBase): #{{{
""" MC on 2D problem

for domains with negligible smb and dH/dt

u,v,a are defined based on two scalar fields D,R
in a way that automatically satisfies the MC
"""
_EQUATION_TYPE = 'MC_exact'
def __init__(self, parameters=MCexactEquationParameter()):
super().__init__(parameters)

def _pde(self, nn_input_var, nn_output_var): #{{{
""" no pde loss required
use data losses vel_mag_MC, u_MC, v_MC, a_MC

Args:
nn_input_var: global input to the nn
nn_output_var: global output from the nn
"""
Hid = self.local_output_var["H"]
H = slice_column(nn_output_var, Hid)

return [] #}}}

def _pde_jax(self, nn_input_var, nn_output_var): #{{{
""" residual of MC 2D PDE, jax version

Args:
nn_input_var: global input to the nn
nn_output_var: global output from the nn
"""
return self._pde(nn_input_var, nn_output_var) #}}}
#}}}
#}}}




# D-HNN exact mass conservation {{{
class MCSteadyexactEquationParameter(EquationParameter, Constants):
""" default parameters for mass conservation
"""
_EQUATION_TYPE = 'MCSteady_exact'
def __init__(self, param_dict={}):
# load necessary constants
Constants.__init__(self)
super().__init__(param_dict)

def set_default(self):
self.input = ['x', 'y']
self.output = ['R', 'H']
self.output_lb = [self.variable_lb[k] for k in self.output]
self.output_ub = [self.variable_ub[k] for k in self.output]
self.data_weights = [1.0, 1.0e-6]
self.residuals = []
self.pde_weights = []

# scalar variables: name:value
self.scalar_variables = {}
class MCSteady_exact(EquationBase): #{{{
""" MC on 2D problem

for domains with negligible smb and dH/dt

u,v,a are defined based on two scalar fields D,R
in a way that automatically satisfies the MC
"""
_EQUATION_TYPE = 'MCSteady_exact'
def __init__(self, parameters=MCSteadyexactEquationParameter()):
super().__init__(parameters)

def _pde(self, nn_input_var, nn_output_var): #{{{
""" no pde loss required
use data losses vel_mag_MC, u_MC, v_MC, a_MC

Args:
nn_input_var: global input to the nn
nn_output_var: global output from the nn
"""
return [] #}}}

def _pde_jax(self, nn_input_var, nn_output_var): #{{{
""" residual of MC 2D PDE, jax version

Args:
nn_input_var: global input to the nn
nn_output_var: global output from the nn
"""
return self._pde(nn_input_var, nn_output_var) #}}}
#}}}
#}}}

117 changes: 117 additions & 0 deletions pinnicle/physics/physics.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,120 @@ def operator(self, pname):
if p._EQUATION_TYPE.lower() == pname.lower():
return p.pde



### transform functions for mass-conserving stressbalance:
def DR_xy(self, nn_input_var, nn_output_var):
""" transform D,R scalar fields of mass-conserving stressbalance
to u,v,a
"""

xid = self.input_var.index('x')
yid = self.input_var.index('y')

Rid = self.output_var.index('R')
R_x = jacobian(nn_output_var, nn_input_var, i=Rid, j=xid)
R_y = jacobian(nn_output_var, nn_input_var, i=Rid, j=yid)

if "D_dH" in self.output_var:
# for MC_exact
Did = self.output_var.index('D_dH')
DdH_x = jacobian(nn_output_var, nn_input_var, i=Did, j=xid)
DdH_y = jacobian(nn_output_var, nn_input_var, i=Did, j=yid)
else:
# for MCSteady_exact
DdH_x = R_x*1e-32
DdH_y = R_y*1e-32

if "D_smb" in self.output_var:
# for MC_exact
Did = self.output_var.index('D_smb')
DdH_x = jacobian(nn_output_var, nn_input_var, i=Did, j=xid)
DdH_y = jacobian(nn_output_var, nn_input_var, i=Did, j=yid)
else:
# for MCSteady_exact
Dsmb_x = R_x*1e-32
Dsmb_y = R_y*1e-32

return [Dsmb_x, Dsmb_y, DdH_x, DdH_y, R_x, R_y]

def DR_to_u(self, nn_input_var, nn_output_var):
""" recover u from scalar fields D,R
"""
Hid = self.output_var.index('H')
H = slice_column(nn_output_var, Hid)
Dsmb_x, _, DdH_x, _, _, R_y = self.DR_xy(nn_input_var,nn_output_var)
# Hd = H.detach()
u = (Dsmb_x + DdH_x - R_y) / H
return u

def DR_to_v(self, nn_input_var, nn_output_var):
""" recover v from scalar fields D,R
"""
Hid = self.output_var.index('H')
H = slice_column(nn_output_var, Hid)
_, Dsmb_y, _, DdH_y, R_x, _ = self.DR_xy(nn_input_var,nn_output_var)
# Hd = H.detach()
v = (Dsmb_y + DdH_y + R_x) / H # divide D_x by yts if dHdt in m/a
return v

def DR_to_dH(self, nn_input_var, nn_output_var):
""" recover a from scalar fields D,R
a = div(grad(D))
"""
xid = self.input_var.index('x')
yid = self.input_var.index('y')
_, _, DdH_x, DdH_y, _, _ = self.DR_xy(nn_input_var,nn_output_var)
DdH_xx = jacobian(DdH_x, nn_input_var, i=0, j=xid)
DdH_yy = jacobian(DdH_y, nn_input_var, i=0, j=yid)
dH = -1. * (DdH_xx + DdH_yy) ## == div(Hv)
return dH

def DR_to_smb(self, nn_input_var, nn_output_var):
""" recover a from scalar fields D,R
a = div(grad(D))
"""
xid = self.input_var.index('x')
yid = self.input_var.index('y')
Dsmb_x, Dsmb_y, _, _, _, _ = self.DR_xy(nn_input_var,nn_output_var)
Dsmb_xx = jacobian(Dsmb_x, nn_input_var, i=0, j=xid)
Dsmb_yy = jacobian(Dsmb_y, nn_input_var, i=0, j=yid)
smb = Dsmb_xx + Dsmb_yy ## == div(Hv)
return smb

def vel_mag_MC(self, nn_input_var, nn_output_var, X):
""" a wrapper for PointSetOperatorBC func call, Args need to follow the requirment by deepxde

Args:
nn_input_var: input tensor to the nn
nn_output_var: output tensor from the nn
X: NumPy array of the collocation points defined on the boundary, required by deepxde
"""
u = self.DR_to_u(nn_input_var,nn_output_var)
v = self.DR_to_v(nn_input_var,nn_output_var)
vel = ppow((bkd.square(u) + bkd.square(v) + 1.0e-30), 0.5)
return vel

def dH_MC(self, nn_input_var, nn_output_var, X):
""" a wrapper for PointSetOperatorBC func call, Args need to follow the requirment by deepxde
"""
dH = self.DR_to_dH(nn_input_var,nn_output_var)
return dH

def smb_MC(self, nn_input_var, nn_output_var, X):
""" a wrapper for PointSetOperatorBC func call, Args need to follow the requirment by deepxde
"""
smb = self.DR_to_smb(nn_input_var,nn_output_var)
return smb

def u_MC(self, nn_input_var, nn_output_var, X):
""" a wrapper for PointSetOperatorBC func call, Args need to follow the requirment by deepxde
"""
u = self.DR_to_u(nn_input_var,nn_output_var)
return u

def v_MC(self, nn_input_var, nn_output_var, X):
""" a wrapper for PointSetOperatorBC func call, Args need to follow the requirment by deepxde
"""
v = self.DR_to_v(nn_input_var,nn_output_var)
return v
Loading
Loading