Other sample problems
Optimizing deterministic material models
The example in examples/structural-inference/tension/deterministic repeats the example described in the detailed tutorial except targeting a deterministic, instead of a statistical, model. The model uses the same synthetic data used in the statistical inference tutorial but fits a deterministic best fit model to the data. The example illustrates how to use the package to train deterministic models and also demonstrates the potential utility of the L-BFGS optimizer when (a) you are training deterministic models and (b) you have sufficient GPU memory available.
#!/usr/bin/env python3
"""
Helper functions for the structural material model inference with
tension tests examples.
"""
import sys
sys.path.append("../../..")
import numpy as np
import numpy.random as ra
import xarray as xr
import torch
from pyoptmat import models, flowrules, hardening, optimize
from pyoptmat.temperature import ConstantParameter as CP
from tqdm import tqdm
import tqdm
import warnings
warnings.filterwarnings("ignore")
# Use doubles
torch.set_default_tensor_type(torch.DoubleTensor)
# Actual parameters
E_true = 150000.0
R_true = 200.0
d_true = 5.0
n_true = 7.0
eta_true = 300.0
s0_true = 50.0
# Scale factor used in the model definition
sf = 0.5
# Select device to run on
if torch.cuda.is_available():
dev = "cuda:0"
else:
dev = "cpu"
device = torch.device(dev)
def make_model(E, n, eta, s0, R, d, device=torch.device("cpu"), **kwargs):
"""
Key function for the entire problem: given parameters generate the model
"""
isotropic = hardening.VoceIsotropicHardeningModel(
CP(
R,
scaling=optimize.bounded_scale_function(
(
torch.tensor(R_true * (1 - sf), device=device),
torch.tensor(R_true * (1 + sf), device=device),
)
),
),
CP(
d,
scaling=optimize.bounded_scale_function(
(
torch.tensor(d_true * (1 - sf), device=device),
torch.tensor(d_true * (1 + sf), device=device),
)
),
),
)
kinematic = hardening.NoKinematicHardeningModel()
flowrule = flowrules.IsoKinViscoplasticity(
CP(
n,
scaling=optimize.bounded_scale_function(
(
torch.tensor(n_true * (1 - sf), device=device),
torch.tensor(n_true * (1 + sf), device=device),
)
),
),
CP(
eta,
scaling=optimize.bounded_scale_function(
(
torch.tensor(eta_true * (1 - sf), device=device),
torch.tensor(eta_true * (1 + sf), device=device),
)
),
),
CP(
s0,
scaling=optimize.bounded_scale_function(
(
torch.tensor(s0_true * (1 - sf), device=device),
torch.tensor(s0_true * (1 + sf), device=device),
)
),
),
isotropic,
kinematic,
)
model = models.InelasticModel(
CP(
E,
scaling=optimize.bounded_scale_function(
(
torch.tensor(E_true * (1 - sf), device=device),
torch.tensor(E_true * (1 + sf), device=device),
)
),
),
flowrule,
)
return models.ModelIntegrator(model, **kwargs)
def generate_input(erates, emax, ntime):
"""
Generate the times and strains given the strain rates, maximum strain, and number of time steps
"""
strain = torch.repeat_interleave(
torch.linspace(0, emax, ntime, device=device)[None, :], len(erates), 0
).T.to(device)
time = strain / erates
return time, strain
def downsample(rawdata, nkeep, nrates, nsamples):
"""
Return fewer than the whole number of samples for each strain rate
"""
ntime = rawdata[0].shape[1]
return tuple(
data.reshape(data.shape[:-1] + (nrates, nsamples))[..., :nkeep].reshape(
data.shape[:-1] + (-1,)
)
for data in rawdata
)
if __name__ == "__main__":
# Running this script will regenerate the data
ntime = 200
emax = 0.5
erates = torch.logspace(-2, -8, 7, device=device)
nrates = len(erates)
nsamples = 50
scales = [0.0, 0.01, 0.05, 0.1, 0.15]
times, strains = generate_input(erates, emax, ntime)
for scale in scales:
print("Generating data for scale = %3.2f" % scale)
full_times = torch.empty((ntime, nrates, nsamples), device=device)
full_strains = torch.empty_like(full_times)
full_stresses = torch.empty_like(full_times)
full_temperatures = torch.zeros_like(full_strains)
for i in tqdm.tqdm(range(nsamples)):
full_times[:, :, i] = times
full_strains[:, :, i] = strains
# True values are 0.5 with our scaling so this is easy
model = make_model(
torch.tensor(0.5, device=device),
torch.tensor(ra.normal(0.5, scale), device=device),
torch.tensor(ra.normal(0.5, scale), device=device),
torch.tensor(ra.normal(0.5, scale), device=device),
torch.tensor(ra.normal(0.5, scale), device=device),
torch.tensor(ra.normal(0.5, scale), device=device),
)
with torch.no_grad():
full_stresses[:, :, i] = model.solve_strain(
times, strains, full_temperatures[:, :, i]
)[:, :, 0]
full_cycles = torch.zeros_like(full_times, dtype=int, device=device)
types = np.array(["tensile"] * (nsamples * len(erates)))
controls = np.array(["strain"] * (nsamples * len(erates)))
ds = xr.Dataset(
{
"time": (["ntime", "nexp"], full_times.flatten(-2, -1).cpu().numpy()),
"strain": (
["ntime", "nexp"],
full_strains.flatten(-2, -1).cpu().numpy(),
),
"stress": (
["ntime", "nexp"],
full_stresses.flatten(-2, -1).cpu().numpy(),
),
"temperature": (
["ntime", "nexp"],
full_temperatures.cpu().flatten(-2, -1).numpy(),
),
"cycle": (["ntime", "nexp"], full_cycles.flatten(-2, -1).cpu().numpy()),
"type": (["nexp"], types),
"control": (["nexp"], controls),
},
attrs={"scale": scale, "nrates": nrates, "nsamples": nsamples},
)
ds.to_netcdf("scale-%3.2f.nc" % scale)
The example in examples/structural-inference/creep-fatigue/deterministic
repeats a similar example using synthetic cyclic (creep-fatigue) test
data and a pyoptmat.hardening.ChabocheHardeningModel viscoplastic material model.
Tutorial example for cyclic load
examples/structural-inference/creep-fatigue/statistical basically
repeats the tutorial, but using
synthetic cyclic test data a pyoptmat.hardening.ChabocheHardeningModel viscoplastic model capable
of capturing kinematic hardening behavior.
Neural ODE example
Inferring the response of a coupled mass-spring-damper system using trajectory data (1) a physical ODE model and (2) a neural ODE model.
#!/usr/bin/env python3
"""
Sample problem trying to infer the response of a chain of serially
connected mass-spring-dashpot 1D elements, fixed at one end of the
chain and subject to a force on the other end.
The number of elements in the chain is `n_chain`.
This example
1. Generates `n_samples` trajectories under a random force described by
... We separate out only the end displacements as observable,
all the interior displacements are treated as hidden state.
2. Try to infer the mass/spring/dashpot constants and the system response
using the analytical ODE as a model, given the end-displacement
trajectories as input.
3. Again try to infer the response from the end displacement trajectories
but now using a neural ODE as the model.
The integration will use `n_time` points in the integration.
"""
import itertools
import torch
from torch import nn
from functorch import vmap, jacrev
import tqdm
import matplotlib.pyplot as plt
import sys
sys.path.append("../..")
from pyoptmat import ode, utility, neuralode
if torch.cuda.is_available():
dev = "cuda:0"
else:
dev = "cpu"
device = torch.device(dev)
# I'm assuming single precision will be fine for this but for now use doubles
torch.set_default_tensor_type(torch.DoubleTensor)
class MassDamperSpring(torch.nn.Module):
"""
Mass, spring, damper system of arbitrary size
I use simple substitution to solve for the velocity and displacement
in a first order system, so the size of the system is twice the size of the
number of elements.
Args:
K (torch.tensor): (n_chain,) vector of stiffnesses
C (torch.tensor): (n_chain,) vector of dampers
M (torch.tensor): (n_chain,) vector of masses
force (function): gives f(t)
"""
def __init__(self, K, C, M, force, C_scale = 1.0e-4, M_scale = 1.0e-5):
super().__init__()
self.K = torch.nn.Parameter(K)
self.C = torch.nn.Parameter(C)
self.M = torch.nn.Parameter(M)
self.force = force
self.C_scale = C_scale
self.M_scale = M_scale
self.half_size = K.shape[0]
self.size = 2 * self.half_size
def forward(self, t, y):
"""
Return the state rate
Args:
t (torch.tensor): (nchunk, nbatch) tensor of times
y (torch.tensor): (nchunk, nbatch, size) tensor of state
Returns:
y_dot: (nchunk, nbatch, size) tensor of state rates
J: (nchunk, nbatch, size, size) tensor of Jacobians
"""
ydot = torch.zeros_like(y)
J = torch.zeros(y.shape + (self.size,), device = t.device)
# Separate out for convenience
u = y[...,:self.half_size]
v = y[...,self.half_size:]
# Differences
du = torch.diff(u, dim = -1)
dv = torch.diff(v, dim = -1)
# Scaled properties
M = self.M * self.M_scale
C = self.C * self.C_scale
# Rate
# Velocity
ydot[...,:self.half_size] = v
# Springs
ydot[...,self.half_size:-1] += self.K[...,:-1] * du / M[...,:-1]
ydot[...,self.half_size+1:] += -self.K[...,:-1] * du / M[...,1:]
ydot[...,-1] += -self.K[...,-1] * u[...,-1] / M[...,-1]
ydot[...,self.half_size] += self.force(t) / M[...,0]
# Dampers
ydot[...,self.half_size:-1] += C[...,:-1] * dv / M[...,:-1]
ydot[...,self.half_size+1:] += -C[...,:-1] * dv / M[...,1:]
ydot[...,-1] += -C[...,-1] * v[...,-1] / M[...,-1]
# Derivative
# Velocity
J[...,:self.half_size,self.half_size:] += torch.eye(self.half_size, device = t.device)
# Springs
J[...,self.half_size:,:self.half_size] += torch.diag_embed(-self.K / M)
J[...,self.half_size+1:,1:self.half_size] += torch.diag_embed(-self.K[...,:-1] / M[...,1:])
J[...,self.half_size:,:self.half_size] += torch.diag_embed(self.K[...,:-1] / M[...,:-1], offset = 1)
J[...,self.half_size:,:self.half_size] += torch.diag_embed(self.K[...,:-1] / M[...,1:], offset = -1)
# Dampers
J[...,self.half_size:,self.half_size:] += torch.diag_embed(-C / M)
J[...,self.half_size+1:,self.half_size+1:] += torch.diag_embed(-C[...,:-1] / M[...,1:])
J[...,self.half_size:,self.half_size:] += torch.diag_embed(C[...,:-1] / M[...,:-1], offset = 1)
J[...,self.half_size:,self.half_size:] += torch.diag_embed(C[...,:-1] / M[...,1:], offset = -1)
return ydot, J
def initial_condition(self, nsamples):
"""
I don't want to learn the initial condition (we could)
so just return deterministic zeros
"""
return torch.zeros(nsamples, self.size, device = device)
def neural_ode_factory(n_hidden, n_layers, n_inter):
"""Make a neural ODE to try
Args:
n_hidden (int): number of hidden state variables
n_layers (int): network depth
n_inter (int): size of hidden layers
"""
n_in = n_hidden + 2
n_out = n_hidden + 1
n_inter += 1
mods = [nn.Linear(n_in, n_inter), nn.ReLU()
] + list(itertools.chain(*[[nn.Linear(n_inter, n_inter), nn.ReLU()] for i in range(n_layers)])
) + [nn.Linear(n_inter, n_out)]
return nn.Sequential(*mods)
def random_walk(time, mean, scale, mag):
"""
Simulate a random walk with velocity of the given mean and scale
"""
results = torch.zeros_like(time)
for i in range(1, time.shape[0]):
dt = time[i] - time[i-1]
v = mag*torch.normal(mean, scale)
results[i] = results[i-1] + v * dt
return results
if __name__ == "__main__":
# Basic parameters
n_chain = 5 # Number of spring-dashpot-mass elements
n_samples = 5 # Number of random force samples
n_time = 512+1 # Number of time steps
integration_method = 'backward-euler'
direct_solver = "thomas" # Batched, block, bidiagonal direct solver method
# Time chunking -- best value may vary on your system
n_chunk = 2**7
# Ending time
t_end = 1.0
# Mean and standard deviation for random force "velocity"
force_mean = torch.tensor(0.0, device = device)
force_std = torch.tensor(1.0, device = device)
force_mag = torch.tensor(1.0, device = device)
# True values of the mass, damping, and stiffness
K = torch.rand(n_chain, device = device)
C = torch.rand(n_chain, device = device)
M = torch.rand(n_chain, device = device)
# Time values
time = torch.linspace(0, t_end, n_time, device = device).unsqueeze(-1).expand(n_time, n_samples)
# 1. Generate the data
# Generate the force values
force = random_walk(time, force_mean.unsqueeze(0).expand(n_samples), force_std.unsqueeze(0).expand(n_samples), force_mag.unsqueeze(0).expand(n_samples))
# Plot them
plt.figure()
plt.plot(time.cpu().numpy(), force.cpu().numpy())
plt.show()
# Interpolate in time
force_continuous = utility.ArbitraryBatchTimeSeriesInterpolator(time, force)
# Setup the ground truth model
model = MassDamperSpring(K, C, M, force_continuous)
# Generate the initial condition
y0 = model.initial_condition(n_samples)
# Generate the data
with torch.no_grad():
y_data = ode.odeint(model, y0, time, method = integration_method,
block_size = n_chunk,
direct_solve_method = direct_solver)
# The observations will just be the first entry
observable = y_data[...,0]
# Plot them
plt.figure()
plt.plot(time.cpu().numpy(), observable.cpu().numpy())
plt.show()
# 2. Infer with the actual model
ode_model = MassDamperSpring(torch.rand(n_chain, device = device),
torch.rand(n_chain, device = device),
torch.rand(n_chain, device = device), force_continuous)
# Training parameters...
niter = 1000
lr = 1.0e-3
loss = torch.nn.MSELoss(reduction = "sum")
# Optimization setup
optim = torch.optim.Adam(ode_model.parameters(), lr)
def closure():
optim.zero_grad()
pred = ode.odeint_adjoint(ode_model, y0, time,
method = integration_method,
block_size = n_chunk,
direct_solve_method = direct_solver)
obs = pred[...,0]
lossv = loss(obs, observable)
lossv.backward()
return lossv
# Optimization loop
t = tqdm.tqdm(range(niter), total = niter, desc = "Loss: ")
loss_history = []
for i in t:
closs = optim.step(closure)
loss_history.append(closs.detach().cpu().numpy())
t.set_description("Loss: %3.2e" % loss_history[-1])
# Plot the loss history
plt.figure()
plt.plot(loss_history)
plt.show()
# Make the final predictions
with torch.no_grad():
ode_preds = ode.odeint(ode_model, y0, time, method = integration_method, block_size = n_chunk)
ode_obs = ode_preds[...,0]
# Make a plot
plt.figure()
for i in range(n_samples):
l, = plt.plot(time.cpu().numpy()[:,i], observable.cpu().numpy()[:,i])
plt.plot(time.cpu().numpy()[:,i], ode_obs.cpu().numpy()[:,i], ls = '--', color = l.get_color())
plt.show()
# 2. Infer with a neural ODE
# Setup the model
n_hidden = n_chain # I don't know, seems reasonable
n_layers = 3
n_inter = n_chain # Same thing, seems reasonable
nn = neural_ode_factory(n_hidden, n_layers, n_inter).to(device)
y0 = torch.zeros(n_samples, n_hidden+1, device = device)
nn_model = neuralode.NeuralODE(nn, lambda t: force_continuous(t).unsqueeze(-1))
# Training parameters
niter = 1000
lr = 1.0e-3
loss = torch.nn.MSELoss(reduction = "sum")
# Optimization setup
optim = torch.optim.Adam(nn_model.parameters(), lr)
def closure():
optim.zero_grad()
pred = ode.odeint_adjoint(nn_model, y0, time, method = integration_method, block_size = n_chunk)
obs = pred[...,0]
lossv = loss(obs, observable)
lossv.backward()
return lossv
# Optimization loop
t = tqdm.tqdm(range(niter), total = niter, desc = "Loss: ")
loss_history = []
for i in t:
closs = optim.step(closure)
loss_history.append(closs.detach().cpu().numpy())
t.set_description("Loss: %3.2e" % loss_history[-1])
# Plot the loss history
plt.figure()
plt.plot(loss_history)
plt.show()
# Make the final predictions
with torch.no_grad():
nn_preds = ode.odeint(nn_model, y0, time, method = integration_method, block_size = n_chunk)
nn_obs = nn_preds[...,0]
# Make a plot
plt.figure()
for i in range(n_samples):
l, = plt.plot(time.cpu().numpy()[:,i], observable.cpu().numpy()[:,i])
plt.plot(time.cpu().numpy()[:,i], nn_obs.cpu().numpy()[:,i], ls = '--', color = l.get_color())
plt.show()
# Compare all three
plt.figure()
for i in range(n_samples):
l, = plt.plot(time.cpu().numpy()[:,i], observable.cpu().numpy()[:,i])
plt.plot(time.cpu().numpy()[:,i], ode_obs.cpu().numpy()[:,i], ls = '--', color = l.get_color())
plt.plot(time.cpu().numpy()[:,i], nn_obs.cpu().numpy()[:,i], ls = ':', color = l.get_color())
plt.show()
Implicit versus explicit integration
The example in examples/ode/vanderpohl.py demonstrates the need for an implicit integration scheme when integrating stiff systems of ODEs, including structural constitutive models. The example uses the ode package to integrate the classic van der Pol equation with different parameters to demonstrate the need for implicit time integration as the equations become stiff. The results of the calculations are differentiable using either direct torch AD or the adjoint methods.
#!/usr/bin/env python3
"""
This example demonstrate the ability of the backward Euler integration
option to accurately integrate a very stiff system of ODEs. The example
system used here is the Van der Pol system from Manichev et al. 2019.
.. math::
\\dot{y}_0 = y_1
\\dot{y}_1 = -y_0 + \\mu \\left(1 - y_0^2 \\right) y_1
with initial conditions
.. math::
y_0(0) = -1
y_1(0) = 1
and :math:`t \\in [0,4.2\\mu].
This system of ODEs is parameterized by :math:`\\mu`. For
:math:`mu = 1.0` the system is not stiff and can be integrated by
either forward or backward methods. For :math:`\\mu=14` the equations
begin to become stiff and the forward Euler method becomes inaccurate.
For :math:`\\mu=30` the equations are very stiff and can only
be integrated with the backward Euler method.
"""
import sys
sys.path.append("../..")
import numpy as np
import torch
import matplotlib.pyplot as plt
from pyoptmat import ode
torch.set_default_tensor_type(torch.DoubleTensor)
class VanderPolODE(torch.nn.Module):
"""
From Manichev et al 2019
x0 = [-1, 1]
t = [0, 4.2*mu)
"""
def __init__(self, mu):
super().__init__()
self.mu = torch.tensor(mu)
def forward(self, t, y):
f = torch.empty(y.shape)
f[..., 0] = y[..., 1]
f[..., 1] = -y[..., 0] + self.mu * (1.0 - y[..., 0] ** 2.0) * y[..., 1]
df = torch.empty(y.shape + y.shape[-1:])
df[..., 0, 0] = 0
df[..., 0, 1] = 1
df[..., 1, 0] = -1 - 2.0 * self.mu * y[..., 0] * y[..., 1]
df[..., 1, 1] = self.mu * (1.0 - y[..., 0] ** 2.0)
return f, df
if __name__ == "__main__":
# Common inputs
y0 = torch.tensor([[-1.0, 1]])
period = lambda m: (3.0 - 2 * np.log(2)) * m + 2.0 * np.pi / mu ** (1.0 / 3)
# Number of vectorized time steps
time_chunk = 10
# Test for non-stiff version
mu = 1.0
times = torch.linspace(0, period(mu) * 10, 10000).unsqueeze(-1)
model = VanderPolODE(mu)
res_exp = ode.odeint(model, y0, times, method="forward-euler",
block_size = time_chunk, guess_type = "previous")
res_imp = ode.odeint(
model, y0, times, method="backward-euler",
block_size = time_chunk, guess_type = "previous"
)
plt.figure()
plt.plot(times, res_exp[:, 0, 0], label="Forward")
plt.plot(times, res_imp[:, 0, 0], label="Backward")
plt.xlabel("time")
plt.ylabel(r"$y_0$")
plt.legend(loc="best")
plt.title("Not stiff")
plt.show()
# Test for moderately stiff version
mu = 14.0
times = torch.linspace(0, period(mu) * 10, 10000).unsqueeze(-1)
model = VanderPolODE(mu)
res_exp = ode.odeint(model, y0, times, method="forward-euler",
block_size = time_chunk, guess_type = "previous")
res_imp = ode.odeint(
model, y0, times, method="backward-euler",
block_size = time_chunk, guess_type = "previous"
)
plt.figure()
plt.plot(times, res_exp[:, 0, 0], label="Forward")
plt.plot(times, res_imp[:, 0, 0], label="Backward")
plt.legend(loc="best")
plt.xlabel("time")
plt.ylabel(r"$y_0$")
plt.title("Moderately stiff")
plt.show()
# Test for highly stiff version (explicit just explodes)
mu = 30.0
times = torch.linspace(0, period(mu) * 10 * 10.0 / 16, 20000).unsqueeze(-1)
model = VanderPolODE(mu)
res_imp = ode.odeint(
model, y0, times, method="backward-euler",
block_size = time_chunk, guess_type = "previous"
)
plt.figure()
plt.plot(times, res_imp[:, 0, 0], label="Backward")
plt.legend(loc="best")
plt.xlabel("time")
plt.ylabel(r"$y_0$")
plt.title("Very stiff")
plt.show()
Statistical inference with a simple ODE
This example (examples/ode/trajectory.py) demonstrates the basic application of pyoptmat to find parameter distributions for systems of stochastic ODEs that match uncertain data. This example demonstrates these concepts using trajectories from a canon fired at a random angle and speed, drawn from known distributions. The example tries to infer the distributions of angle and speed given some number of observed trajectories. This problem is small enough that you can run it using either the adjoint method or AD to calculate the sensitivities, to compare the relative performance of both approaches.
#!/usr/bin/env python3
"""
This example demonstrates using the basic capabilities of pyopmat to
find parameter distributions for a system of ODEs to match variable
measured data. The particular example here is a cannon firing with a
random velocity, parameterized here with a random speed and launch
angle. The observed "data" is the resulting trajectory of the cannon ball,
starting from the point of launch and ending when it hits the ground.
In this case, the x-coordinate represents the distance from the cannon
and the y-coordinate the height above the initial launch point.
This example further applies some additional white noise on top of the
trajectory "measurements" representing random experimental error in
measuring or recording the data.
The goal of the variational inference is to recover the statistical
distribution of launch speeds and angles by observing some number of
trajectories. The example also tries to recover the scale of the
random, white noise superimposed on top of the measured trajectories.
The example, as setup below, observes 50 random trajectories
and uses pyro's SVI algorithm to infer the angle and speed distributions.
The example plots the results and compares the inferred distributions to
the known actual posteriors.
"""
import matplotlib.pyplot as plt
import torch
import pyro
from pyro.nn import PyroSample
import pyro.distributions as dist
from pyro.infer import SVI, Trace_ELBO, Predictive, JitTrace_ELBO
import pyro.optim as optim
import pyro.distributions.constraints as constraints
from tqdm import tqdm
import sys
sys.path.append("../..")
from pyoptmat import ode
# Gravity
g = torch.tensor(0.1)
# Actual location and scale of the speed and launch angle
# True posteriors are normal distributions
v_loc_act = 2.0
v_scale_act = 0.03
a_loc_act = 0.7
a_scale_act = 0.04
# Prior normal distribution for the speed and launch angle
v_loc_prior = 1.5
v_scale_prior = 0.1
a_loc_prior = 0.3
a_scale_prior = 0.1
# White noise applied to the observations
eps_act = 0.05
# Prior for the noise
eps_prior = 0.1 # Just measure variance in data...
def model_act(times):
"""
times: ntime x nbatch
trajectories: ntime x nbatch x 2
"""
v = pyro.sample("v", dist.Normal(v_loc_act, v_scale_act))
a = pyro.sample("a", dist.Normal(a_loc_act, a_scale_act))
simulated = pyro.sample(
"data",
dist.Normal(
torch.stack(
(
v * torch.cos(a) * times,
v * torch.sin(a) * times - 0.5 * g * times ** 2.0,
)
).T,
eps_act,
),
)
return simulated
class Integrator(pyro.nn.PyroModule):
def __init__(self, eqn, y0, extra_params=[], block_size = 1):
super().__init__()
self.eqn = eqn
self.y0 = y0
self.extra_params = extra_params
self.block_size = block_size
def forward(self, times):
return ode.odeint_adjoint(
self.eqn,
self.y0,
times,
block_size = self.block_size,
extra_params=self.extra_params,
)
class ODE(pyro.nn.PyroModule):
def __init__(self, v, a):
super().__init__()
self.v = v
self.a = a
def forward(self, t, y):
f = torch.empty(y.shape)
# Acceleration
f[..., 0] = self.v * torch.cos(self.a)
f[..., 1] = self.v * torch.sin(self.a) - g * t
# Nice ODE lol
df = torch.zeros(y.shape + y.shape[-1:])
return f, df
class Model(pyro.nn.PyroModule):
def __init__(
self,
maker,
names,
loc_priors,
scale_priors,
loc_suffix="_loc",
scale_suffix="_scale",
param_suffix="_param",
bot_suffix="_bot",
):
super().__init__()
self.maker = maker
self.loc_suffix = loc_suffix
self.scale_suffix = scale_suffix
self.param_suffix = param_suffix
self.bot_suffix = bot_suffix
self.loc_priors = loc_priors
self.scale_priors = scale_priors
self.eps_prior = eps_prior
self.names = names
# Setup both levels of distributions
self.bot_vars = names
self.top_vars = []
for var, loc, scale in zip(names, loc_priors, scale_priors):
setattr(self, var + loc_suffix, PyroSample(dist.Normal(loc, scale)))
self.top_vars.append(var + loc_suffix)
setattr(self, var + scale_suffix, PyroSample(dist.LogNormal(scale, 1)))
self.top_vars.append(var + scale_suffix)
setattr(
self,
var,
PyroSample(
lambda self, var=var: dist.Normal(
getattr(self, var + loc_suffix),
getattr(self, var + scale_suffix),
)
),
)
# Setup noise
self.eps = PyroSample(dist.LogNormal(eps_prior, 1))
self.extra_param_names = []
def forward(self, times, actual=None):
y0 = torch.zeros((times.shape[1],) + (2,))
curr = self.sample_top()
eps = self.eps
with pyro.plate("trials", times.shape[1]):
bmodel = self.maker(*self.sample_bot(), extra_params=self.gen_extra())
simulated = bmodel(times)
with pyro.plate("time", times.shape[0]):
pyro.sample("obs", dist.Normal(simulated, eps).to_event(1), obs=actual)
return simulated
def sample_top(self):
return [getattr(self, name) for name in self.top_vars]
def sample_bot(self):
return [getattr(self, name) for name in self.bot_vars]
def make_guide(self):
def guide(times, actual=None):
top_loc_samples = []
top_scale_samples = []
for name, loc, scale in zip(self.names, self.loc_priors, self.scale_priors):
loc_param = pyro.param(
name + self.loc_suffix + self.param_suffix, torch.tensor(loc)
)
scale_param = pyro.param(
name + self.scale_suffix + self.param_suffix,
torch.tensor(scale),
constraint=constraints.positive,
)
top_loc_samples.append(
pyro.sample(name + self.loc_suffix, dist.Delta(loc_param))
)
top_scale_samples.append(
pyro.sample(name + self.scale_suffix, dist.Delta(scale_param))
)
eps_param = pyro.param(
"eps" + self.param_suffix,
torch.tensor(self.eps_prior),
constraint=constraints.positive,
)
eps_sample = pyro.sample("eps", dist.Delta(eps_param))
with pyro.plate("trials", times.shape[1]):
for name, loc_sample, scale_sample, loc_prior, scale_prior in zip(
self.names,
top_loc_samples,
top_scale_samples,
self.loc_priors,
self.scale_priors,
):
ll_param = pyro.param(
name + self.param_suffix,
torch.ones(times.shape[1]) * torch.tensor(loc_prior),
)
pyro.sample(name, dist.Delta(ll_param))
self.extra_param_names = [var + self.param_suffix for var in self.names]
return guide
def gen_extra(self):
return [pyro.param(name).unconstrained() for name in self.extra_param_names]
if __name__ == "__main__":
# Number of samples to provide
nsamples = 50
# Maximum and number of time steps
tmax = 20.0
tnum = 100
# Number of vectorized time steps to evaluate at once
time_block = 50
time = torch.linspace(0, tmax, tnum)
times = torch.empty(tnum, nsamples)
data = torch.empty(tnum, nsamples, 2)
with torch.no_grad():
for i in range(nsamples):
times[:, i] = time
data[:, i] = model_act(time)
plt.figure()
plt.plot(data[:, :, 0], data[:, :, 1])
plt.xlabel("x-coordinate")
plt.ylabel("y-coordinate")
plt.title("Trajectory data")
plt.show()
pyro.clear_param_store()
def maker(v, a, **kwargs):
return Integrator(ODE(v, a), torch.zeros(nsamples, 2),
block_size = time_block, **kwargs)
# Setup the model
model = Model(
maker, ["v", "a"], [v_loc_prior, a_loc_prior], [v_scale_prior, a_scale_prior]
)
# Optimization hyperparameters: learning rate, number of iterations, and
# number of samples for calculating the ELBO
lr = 5.0e-3
niter = 250
num_samples = 1
guide = model.make_guide()
# Init guide
guide(times)
optimizer = optim.ClippedAdam({"lr": lr})
l = Trace_ELBO(num_particles=num_samples)
svi = SVI(model, guide, optimizer, loss=l)
t = tqdm(range(niter))
loss_hist = []
for i in t:
loss = svi.step(times, data)
loss_hist.append(loss)
t.set_description("Loss: %3.2e" % loss)
print("Inferred distributions:")
print(
"Velocity mean: %4.3f, actual %4.3f"
% (pyro.param("v_loc_param").data, v_loc_act)
)
print(
"Velocity scale: %4.3f, actual %4.3f"
% (pyro.param("v_scale_param").data, v_scale_act)
)
print(
"Angle mean: %4.3f, actual %4.3f" % (pyro.param("a_loc_param").data, a_loc_act)
)
print(
"Angle scale: %4.3f, actual %4.3f"
% (pyro.param("a_scale_param").data, a_scale_act)
)
print("White noise: %4.3f, actual %4.3f" % (pyro.param("eps_param").data, eps_act))
plt.figure()
plt.plot(loss_hist)
plt.xlabel("Iteration")
plt.ylabel("Loss")
plt.title("Convergence diagram")
plt.show()
print("")
nsample = 1
predict = Predictive(model, guide=guide, num_samples=nsample, return_sites=("obs",))
with torch.no_grad():
samples = predict(times)["obs"]
min_x, _ = torch.min(samples[0, :, :, 0], 1)
max_x, _ = torch.max(samples[0, :, :, 0], 1)
min_y, _ = torch.min(samples[0, :, :, 1], 1)
max_y, _ = torch.max(samples[0, :, :, 1], 1)
plt.figure()
plt.plot(times, data[:, :, 1], "k-", lw=0.5)
plt.fill_between(time, min_y, max_y, alpha=0.75)
plt.xlabel("Time")
plt.ylabel("y coordinate")
plt.title("Height versus time")
plt.show()
plt.figure()
plt.plot(times, data[:, :, 0], "k-", lw=0.5)
plt.fill_between(time, min_x, max_x, alpha=0.75)
plt.xlabel("Time")
plt.ylabel("x coordinate")
plt.title("Distance versus time")
plt.show()
Hogdkin-Huxley coupled neuron model
An example, scalable system of ODEs for benchmarking.
#!/usr/bin/env python
"""
This example integrates a more complicated system of ODEs:
a model of coupled neurons responding to a cyclic
electrical current (the Hodgkin and Huxley model, as
described by :cite:`schwemmer2012theory`).
The number of neurons, current cycles, time steps per
cycle, vectorized time steps, etc. can be customized.
This is a decent benchmark problem for examining the performance
of pyoptmat on your machine.
"""
import sys
sys.path.append('../..')
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
from pyoptmat import ode, experiments, utility
import time
# Use doubles
torch.set_default_tensor_type(torch.DoubleTensor)
# Select device to run on
if torch.cuda.is_available():
dev = "cuda:0"
else:
dev = "cpu"
device = torch.device(dev)
def driving_current(I_max, R, rate, thold, chold, Ncycles, nload,
nhold, neq):
"""
Setup cyclic current histories
Args:
I_max (tuple): (min,max) uniform distribution of max currents
R (tuple): (min,max) uniform distribution of load ratios
rate (tuple): (min, max) uniform distribution of current rates
thold (tuple): (min, max) uniform distribution of hold at max current times
chold (tuple): (min, max) uniform distribution of hold at min current times
Ncycles (int): number of load cycles
nload (int): number of time steps for current ramp
nhold (int): number of time steps for current holds
neq (int): number of independent current histories
"""
Ntotal = (4*nload + 2*nhold) * Ncycles + 1
times = torch.empty((Ntotal,neq))
I = torch.empty((Ntotal,neq))
for i in range(neq):
cycle = experiments.generate_random_cycle(
I_max, R, rate, thold, chold)
ti, Ii, ci = experiments.sample_cycle_normalized_times(
cycle, Ncycles, nload, nhold)
times[:,i] = torch.tensor(ti)
I[:,i] = torch.tensor(Ii)
return utility.ArbitraryBatchTimeSeriesInterpolator(
times.to(device), I.to(device)), times.to(device), I.to(device)
class HodgkinHuxleyCoupledNeurons(torch.nn.Module):
"""
As described by :cite:`schwemmer2012theory`
.. math::
F(y) = \\begin{bmatrix} F_1(y_1)
\\\\ F_2(y_2)
\\\\ \\vdots
\\\\ F_n(y_n)
\\end{bmatrix}
with
.. math::
y = \\begin{bmatrix} y_1
\\\ y_2
\\\ \\vdots
\\\ y_n
\\end{bmatrix}
and
.. math::
F_i(y_i) = \\begin{bmatrix} V_i
\\\\ m_i
\\\\ h_i
\\\\ n_i
\\end{bmatrix}
and
.. math::
F_i(y_i) = \\begin{bmatrix} \\frac{1}{C_i}(-g_{Na,i}m_i^3h_i(V_i-E_{Na,i})-g_{K,i}n_i^4(V_i-E_{k,i})-g_{L,i}(V_i-E_{L,i})+I(t) + \\Delta V_{ij})
\\\\ \\frac{m_{\\infty,i}-m_i}{\\tau_{m,i}}
\\\\ \\frac{h_{\\infty,i}-h_i}{\\tau_{h,i}}
\\\\ \\frac{n_{\\infty,i}-n_i}{\\tau_{n,i}}
\\end{bmatrix}
with
.. math::
\\Delta V_{ij} = \\sum_{j=1}^{n_{neurons}}\\frac{g_{C,i}(V_i-V_j)}{C_i}
"""
def __init__(self, C, g_Na, E_Na, g_K, E_K, g_L, E_L, m_inf,
tau_m, h_inf, tau_h, n_inf, tau_n, g_C, I):
super().__init__()
self.C = nn.Parameter(C)
self.g_Na = nn.Parameter(g_Na)
self.E_Na = nn.Parameter(E_Na)
self.g_K = nn.Parameter(g_K)
self.E_K = nn.Parameter(E_K)
self.g_L = nn.Parameter(g_L)
self.E_L = nn.Parameter(E_L)
self.m_inf = nn.Parameter(m_inf)
self.tau_m = nn.Parameter(tau_m)
self.h_inf = nn.Parameter(h_inf)
self.tau_h = nn.Parameter(tau_h)
self.n_inf = nn.Parameter(n_inf)
self.tau_n = nn.Parameter(tau_n)
self.g_C = nn.Parameter(g_C)
self.I = I
self.n_neurons = self.g_C.shape[0]
self.n_equations = self.n_neurons * 4
def forward(self, t, y):
"""
Evaluate the system of ODEs at a particular state
Args:
t (torch.tensor): current time
y (torch.tensor): current state
Returns:
y_dot (torch.tensor): current ODEs
J (torch.tensor): current jacobian
"""
Ic = self.I(t)
V = y[...,0::4].clone()
m = y[...,1::4].clone()
h = y[...,2::4].clone()
n = y[...,3::4].clone()
ydot = torch.zeros_like(y)
# V
ydot[...,0::4] = 1.0 / self.C[None,...] * (
-self.g_Na[None,...] * m**3.0 * h * (V - self.E_Na[None,...])
-self.g_K[None,...] * n**4.0 * (V - self.E_K[None,...])
-self.g_L[None,...] * (V - self.E_L[None,...]) + Ic[...,None])
# Coupling term
dV = torch.sum(self.g_C[None,...]*(V[...,:,None] - V[...,None,:]) / self.C[None,...], dim = -1)
ydot[...,0::4] += dV
# m
ydot[...,1::4] = (self.m_inf - m) / self.tau_m
# h
ydot[...,2::4] = (self.h_inf - h) / self.tau_h
# n
ydot[...,3::4] = (self.n_inf - n) / self.tau_n
J = torch.zeros(y.shape + y.shape[-1:], device = ydot.device)
# V, V
J[...,0::4,0::4] = torch.diag_embed(1.0 / self.C[None,...] * (
-self.g_L[None,...]
-self.g_Na[None,...]*h*m**3.0
-self.g_K[None,...]*n**4.0))
# Coupling term
J[...,0::4,0::4] -= self.g_C[None,...] / self.C[None,...]
J[...,0::4,0::4] += torch.eye(self.n_neurons, device = ydot.device).expand(
*ydot.shape[:-1], -1, -1) * torch.sum(self.g_C / self.C)
# V, m
J[...,0::4,1::4] = torch.diag_embed(-1.0 / self.C[None,...] * (
3.0 * self.g_Na[None,...] * h * m**2.0 * (-self.E_Na[None,...] + V)))
# V, h
J[...,0::4,2::4] = torch.diag_embed(-1.0 / self.C[None,...] * (
self.g_Na[None,...] * m**3.0 * (-self.E_Na[None,...] + V)))
# V, n
J[...,0::4,3::4] = torch.diag_embed(-1.0 / self.C[None,...] * (
4.0 * self.g_K[None,...] * n**3.0 * (-self.E_K[None,...] + V)))
# m, m
J[...,1::4,1::4] = torch.diag(-1.0 / self.tau_m)
# h, h
J[...,2::4,2::4] = torch.diag(-1.0 / self.tau_h)
# n, n
J[...,3::4,3::4] = torch.diag(-1.0 / self.tau_n)
return ydot, J
if __name__ == "__main__":
# Batch size
nbatch = 100
# Time chunk size
time_block = 50
# Number of neurons
neq = 10
# Number of cycles
N = 5
# Number of time steps per cycle
n = 40
# Setup the driving current
current, times, discrete_current = driving_current([0.1,1],
[-1,0.9],
[0.5,1.5],
[0,1],
[0,1],
N,
n,
n,
nbatch)
plt.plot(times.cpu().numpy()[:,::4], discrete_current.cpu().numpy()[:,::4])
plt.xlabel("Time")
plt.ylabel("Current")
plt.show()
# Setup model and initial conditions
nsteps = times.shape[0]
model = HodgkinHuxleyCoupledNeurons(
torch.rand((neq,), device = device),
torch.rand((neq,), device = device),
torch.rand((neq,), device = device),
torch.rand((neq,), device = device),
torch.rand((neq,), device = device),
torch.rand((neq,), device = device),
torch.rand((neq,), device = device),
torch.rand((neq,), device = device),
torch.rand((neq,), device = device) * 5.0,
torch.rand((neq,), device = device),
torch.rand((neq,), device = device) * 15.0,
torch.rand((neq,), device = device),
torch.rand((neq,), device = device) * 10.0,
torch.rand((neq,), device = device) * 0.01,
current)
y0 = torch.rand((nbatch,model.n_equations), device = device)
# Include a backward pass to give a better example of timing
t1 = time.time()
res_imp = ode.odeint_adjoint(model, y0, times, block_size = time_block)
loss = torch.norm(res_imp)
loss.backward()
etime = time.time() - t1
print("%i batch size, %i blocked time steps, %i neurons, %i time steps: %f s" % (nbatch, time_block, neq, nsteps, etime))
plt.plot(times[:,0].detach().cpu().numpy(), res_imp[:,0,0::4].detach().cpu().numpy())
plt.xlabel("Time")
plt.ylabel("Voltage")
plt.show()
Temperature dependent parameters
examples/structural-material-models/temperature_dependence.py illustrates how to setup and run structural material models with temperature dependent parameters.
#!/usr/bin/env python3
"""
This is a simple demonstration of how pyopmat models can vary with
temperature. The example simulates the response of a simple, temperature
dependent, viscoplastic model for Alloy 617 developed by
Messner, Phan, and Sham under uniaxial, monotonic tensile strain
at several different temperatures.
"""
import sys
sys.path.append("../..")
import torch
import matplotlib.pyplot as plt
from pyoptmat import models, flowrules, temperature, experiments
torch.set_default_tensor_type(torch.DoubleTensor)
if __name__ == "__main__":
# This example illustrates the temperature and rate sensitivity of
# Alloy 617 with the model given in:
# Messner, Phan, and Sham. IJPVP 2019.
# Setup test conditions
ntemps = 5
temps = torch.linspace(750, 950, ntemps) + 273.15
elimits = torch.ones(ntemps) * 0.25
erates = torch.ones(ntemps) * 8.33e-5
nsteps = 200
times, strains, temperatures, cycles = experiments.make_tension_tests(
erates, temps, elimits, nsteps
)
# Perfect viscoplastic model
A = torch.tensor(-8.679)
B = torch.tensor(-0.744)
mu = temperature.PolynomialScaling(
torch.tensor([-1.34689305e-02, -5.18806776e00, 7.86708330e04])
)
b = torch.tensor(2.474e-7)
k = torch.tensor(1.38064e-20)
eps0 = torch.tensor(1e10)
E = temperature.PolynomialScaling(
torch.tensor([-3.48056033e-02, -1.44398964e01, 2.06464967e05])
)
fr = flowrules.PerfectViscoplasticity(
temperature.KMRateSensitivityScaling(A, mu, b, k),
temperature.KMViscosityScaling(A, B, mu, eps0, b, k),
)
model = models.InelasticModel(E, fr)
integrator = models.ModelIntegrator(model)
stresses = integrator.solve_strain(times, strains, temperatures)[:, :, 0]
print("Temperature scaling")
for ei, Ti, si in zip(
strains.T.numpy(), temperatures.T.numpy(), stresses.T.numpy()
):
plt.plot(ei, si, label="T = %3.0fK" % Ti[0])
plt.legend(loc="best")
plt.xlabel("Strain (mm/mm)")
plt.ylabel("Stress (Mpa)")
plt.show()
Stress and strain control
The example in examples/structural-material-models/stress_strain_control.py demonstrates how to run simulations of structural tests conducted under strain and stress control. pyoptmat can run both types of tests and correctly provide derivatives using the adjoint method
#!/usr/bin/env python3
"""
This example demonstrates the ability of pyoptmat to integrate material models under
strain control, stress control, or a combination of both.
The example first simulates standard, strain-rate controlled tensile curves
for a simple material model for Alloy 617 developed by Messner, Phan, and Sham. These
simulations then take time, strain, and temperature as inputs and outputs the
resulting stresses.
Then, the example uses the stresses from the strain controlled simulations
to repeat the simulations under stress control. These simulations therefore
take as input time, temperature, and stress and output strain. The plot demonstrates that
the strain controlled and stress controlled simulations produce identical results.
Finally, the example integrates the model twice for each temperature, once in strain
control and once in stress control, but integrated all the experiments at once.
This example then demos the ability of pyoptmat to integrate both stress and strain
controlled tests in the same vectorized calculation.
"""
import sys
sys.path.append("../..")
import torch
import matplotlib.pyplot as plt
from pyoptmat import models, flowrules, temperature, experiments, hardening
torch.set_default_tensor_type(torch.DoubleTensor)
if __name__ == "__main__":
# This example illustrates the temperature and rate sensitivity of
# Alloy 617 with the model given in:
# Messner, Phan, and Sham. IJPVP 2019.
# Setup test conditions
ntemps = 5
temps = torch.linspace(750, 950, ntemps) + 273.15
elimits = torch.ones(ntemps) * 0.25
erates = torch.ones(ntemps) * 8.33e-5
nsteps = 200
times, strains, temperatures, cycles = experiments.make_tension_tests(
erates, temps, elimits, nsteps
)
# Perfect viscoplastic model
A = torch.tensor(-8.679)
B = torch.tensor(-0.744)
mu = temperature.PolynomialScaling(
torch.tensor([-1.34689305e-02, -5.18806776e00, 7.86708330e04])
)
b = torch.tensor(2.474e-7)
k = torch.tensor(1.38064e-20)
eps0 = torch.tensor(1e10)
E = temperature.PolynomialScaling(
torch.tensor([-3.48056033e-02, -1.44398964e01, 2.06464967e05])
)
ih = hardening.VoceIsotropicHardeningModel(
temperature.ConstantParameter(torch.tensor(150.0)),
temperature.ConstantParameter(torch.tensor(10.0)),
)
kh = hardening.NoKinematicHardeningModel()
fr = flowrules.IsoKinViscoplasticity(
temperature.KMRateSensitivityScaling(A, mu, b, k),
temperature.KMViscosityScaling(A, B, mu, eps0, b, k),
temperature.ConstantParameter(0),
ih,
kh,
)
model = models.InelasticModel(E, fr)
integrator = models.ModelIntegrator(model)
stresses = integrator.solve_strain(times, strains, temperatures)[:, :, 0]
# Now do it backwards!
strains_prime = integrator.solve_stress(times, stresses, temperatures)[:, :, 0]
plt.figure()
for ei, epi, Ti, si in zip(
strains.T.numpy(),
strains_prime.T.numpy(),
temperatures.T.numpy(),
stresses.T.numpy(),
):
(l,) = plt.plot(ei, si, label="T = %3.0fK" % Ti[0])
plt.plot(epi, si, label=None, ls="--", color=l.get_color())
plt.legend(loc="best")
plt.xlabel("Strain (mm/mm)")
plt.ylabel("Stress (Mpa)")
plt.title("Comparison between strain and stress control")
plt.show()
# Now do it both ways at once!
big_times = torch.cat((times, times), 1)
big_temperatures = torch.cat((temperatures, temperatures), 1)
big_data = torch.cat((strains, stresses), 1)
big_types = torch.zeros(2 * ntemps)
big_types[ntemps:] = 1
both_results = integrator.solve_both(
big_times, big_temperatures, big_data, big_types
)
plt.figure()
for i in range(ntemps):
(l,) = plt.plot(
strains[:, i], both_results[:, i, 0], label="T = %3.0fK" % Ti[i]
)
plt.plot(
both_results[:, i + ntemps, 0],
stresses[:, i],
label=None,
ls="--",
color=l.get_color(),
)
plt.xlabel("Strain (mm/mm)")
plt.ylabel("Stress (MPa)")
plt.legend(loc="best")
plt.title("Running both strain and stress control at once")
plt.show()
Creep tests
examples/structural-material-models/stress_control_creep.py is an example of simulating stress-controlled creep tests, with the idea of using such data to train constitutive models.
#!/usr/bin/env python3
"""
An example of how to load a model in stress control. The example
simulates the behavior of a simple viscoplastic model for the high
temperature deformation of Alloy 617 developed by Messner, Phan,
and Sham under creep conditions. The simulations then load the
model up to a given stress and then hold the model at that stress
for a long period of time. The experimental response is often
given as a creep curve -- a plot of strain versus time during
the hold at constant stress. This example plots a modification, giving
the total accumulated strain as a function of time, including the
strain accumulated during the load up to the constant stress.
"""
import sys
sys.path.append("../..")
import torch
import matplotlib.pyplot as plt
from pyoptmat import models, flowrules, temperature, experiments, hardening
torch.set_default_tensor_type(torch.DoubleTensor)
if __name__ == "__main__":
# This example illustrates the temperature and rate sensitivity of
# Alloy 617 with the model given in:
# Messner, Phan, and Sham. IJPVP 2019.
# Setup test conditions
target_temperature = 950.0
target_stresses = torch.linspace(80, 140, 13)
target_times = torch.ones_like(target_stresses) * 100000 * 3600.0
nbatch = len(target_stresses)
nsteps_load = 50
nsteps_hold = 100
nsteps = nsteps_load + nsteps_hold
loading_rate = 1.0
times, stresses, temperatures, cycles = experiments.make_creep_tests(
target_stresses,
torch.ones_like(target_stresses) * target_temperature,
torch.ones_like(target_stresses) * loading_rate,
target_times,
nsteps_load,
nsteps_hold,
)
# Model with hardening
A = torch.tensor(-8.679)
B = torch.tensor(-0.744)
mu = temperature.PolynomialScaling(
torch.tensor([-1.34689305e-02, -5.18806776e00, 7.86708330e04])
)
b = torch.tensor(2.474e-7)
k = torch.tensor(1.38064e-20)
eps0 = torch.tensor(1e10)
E = temperature.PolynomialScaling(
torch.tensor([-3.48056033e-02, -1.44398964e01, 2.06464967e05])
)
ih = hardening.VoceIsotropicHardeningModel(
temperature.ConstantParameter(torch.tensor(150.0)),
temperature.ConstantParameter(torch.tensor(10.0)),
)
kh = hardening.NoKinematicHardeningModel()
fr = flowrules.IsoKinViscoplasticity(
temperature.KMRateSensitivityScaling(A, mu, b, k),
temperature.KMViscosityScaling(A, B, mu, eps0, b, k),
temperature.ConstantParameter(0),
ih,
kh,
)
model = models.InelasticModel(E, fr)
integrator = models.ModelIntegrator(model)
strains = integrator.solve_stress(times, stresses, temperatures)[:, :, 0]
plt.plot(times / 3600.0, strains)
plt.xlabel("Time (hrs)")
plt.ylabel("Total strain (mm/mm)")
plt.show()
AD versus adjoint comparison
examples/performance/ad_vs_adjoint.py compares the efficiency of computing the parameter gradients using the AD versus the adjoint method. It demonstrates that the adjoint method is always better for practical problems: it is both faster and requires far less memory on the device.
#!/usr/bin/env python3
"""
A comparison of the efficiency of Pytorch AD versus the adjoint method
for calculating the parameter gradient of a model.
The demo runs `n_tests` tension tests using `n_steps` timesteps to
integrate each test. Given these two size parameter as input, the
code:
1. Generates the input tensors describing the time/strain history of
each tensile test
2. Runs the same process twice, once using Pytorch AD to calculate the
gradient and then again using the adjoint method
a. Integrate the tests through time (forward pass)
b. Calculate the norm of the resulting integrated stresses, which
is a reasonable surrogate for the actual loss functions used
during training
c. Calculate the gradient of that norm value with respect to the model
parameters
3. Compare the walltime and memory costs of running the analysis using
the two methods.
Running this script on a NVIDIA GeForce RTX 3070 Ti gives the following
results for `n_tests = 500`:
======= ============= ============= ================== =================
n_time AD, time (s) AD, mem (MB) Adjoint, time (s) Adjoint, mem (MB)
======= ============= ============= ================== =================
100 9.266 274.4 7.057 18.42
200 16.93 475.6 12.95 37.58
300 24.43 674.8 18.92 55.21
400 31.96 872.0 25.01 73.45
500 40.59 1062 31.97 93.76
750 60.26 1527 46.13 137.8
1000 77.55 2027 62.43 188.0
======= ============= ============= ================== =================
Running for fixed `n_time = 100` gives:
======== ============= ============= ================== =================
n_tests AD, time (s) AD, mem (MB) Adjoint, time (s) Adjoint, mem (MB)
======== ============= ============= ================== =================
100 8.963 68.27 6.859 3.781
200 9.193 131.9 6.993 7.512
300 9.084 171.7 6.962 11.10
400 9.257 234.6 6.947 14.83
500 9.175 274.4 7.004 18.42
750 9.280 410.1 7.034 27.60
1000 9.424 545.2 7.084 37.72
5000 9.984 2686.7 7.613 187.3
======== ============= ============= ================== =================
"""
# Size of problem to run
n_tests = 500
n_time = 500
import sys
sys.path.append("../..")
import time
from pyoptmat import models, flowrules, temperature, experiments, hardening, optimize
from pyoptmat.temperature import ConstantParameter as CP
import torch
torch.set_default_tensor_type(torch.DoubleTensor)
if torch.cuda.is_available():
dev = "cuda:0"
else:
dev = "cpu"
device = torch.device(dev)
def model_maker(E, R, d, n, eta, s0, C, g, **kwargs):
"""
Make the model
"""
iso = hardening.VoceIsotropicHardeningModel(CP(R), CP(d))
kin = hardening.ChabocheHardeningModel(CP(C), CP(g))
flow = flowrules.IsoKinViscoplasticity(CP(n), CP(eta), CP(s0), iso, kin)
mod = models.InelasticModel(CP(E), flow)
return models.ModelIntegrator(mod, **kwargs).to(device)
if __name__ == "__main__":
# Model parameters to use
E = torch.tensor(150000.0, device=device)
R = torch.tensor(200.0, device=device)
d = torch.tensor(5.0, device=device)
n = torch.tensor(7.0, device=device)
eta = torch.tensor(300.0, device=device)
s0 = torch.tensor(50.0, device=device)
C = torch.tensor([10000.0, 5000.0], device=device)
g = torch.tensor([100.0, 50.0], device=device)
names = ["E", "R", "d", "n", "eta", "s0", "C", "g"]
values = [E, R, d, n, eta, s0, C, g]
# Setup the input data
rates = torch.ones(n_tests, device=device) * 1.0e-5
temps = torch.zeros(n_tests, device=device)
elimits = torch.ones(n_tests, device=device) * 0.2
times, strains, temps, cycles = experiments.make_tension_tests(
rates, temps, elimits, n_time
)
data = torch.stack((times, temps, strains)).to(device)
types = torch.tensor([experiments.exp_map["tensile"]] * n_tests, device=device)
control = torch.tensor([experiments.control_map["strain"]] * n_tests, device=device)
print("Running example with size %i x %i" % (n_tests, n_time))
print("")
# Do this twice (once for AD, once for adjoint)
for use_adjoint in [False, True]:
# Get the actual model
maker = lambda *p: model_maker(*p, use_adjoint=use_adjoint)
model = optimize.DeterministicModel(maker, names, values)
# Start timing/monitoring
tstart = time.time()
torch.cuda.reset_peak_memory_stats(device)
# Run the forward integration
pred = model(data, cycles, types, control)
# Calculate the loss surrogate
loss = torch.norm(pred)
# Calculate the gradient
loss.backward()
# Stop monitoring
tend = time.time()
mem = torch.cuda.max_memory_allocated() / 1024 / 1024
# Print statistics
if use_adjoint:
print("Adjoint method:")
else:
print("Torch AD:")
print("\tWalltime (s): %f" % (tend - tstart))
print("\tMax memory use (MB): %f" % mem)
print("")