Other sample problems

Optimizing deterministic material models

The example in examples/structural-inference/tension/deterministic repeats the example described in the detailed tutorial except targeting a deterministic, instead of a statistical, model. The model uses the same synthetic data used in the statistical inference tutorial but fits a deterministic best fit model to the data. The example illustrates how to use the package to train deterministic models and also demonstrates the potential utility of the L-BFGS optimizer when (a) you are training deterministic models and (b) you have sufficient GPU memory available.

#!/usr/bin/env python3

"""
    Helper functions for the structural material model inference with
    tension tests examples.
"""

import sys

sys.path.append("../../..")

import numpy as np
import numpy.random as ra

import xarray as xr

import torch
from pyoptmat import models, flowrules, hardening, optimize
from pyoptmat.temperature import ConstantParameter as CP

from tqdm import tqdm

import tqdm

import warnings

warnings.filterwarnings("ignore")

# Use doubles
torch.set_default_tensor_type(torch.DoubleTensor)

# Actual parameters
E_true = 150000.0
R_true = 200.0
d_true = 5.0
n_true = 7.0
eta_true = 300.0
s0_true = 50.0

# Scale factor used in the model definition
sf = 0.5

# Select device to run on
if torch.cuda.is_available():
    dev = "cuda:0"
else:
    dev = "cpu"
device = torch.device(dev)


def make_model(E, n, eta, s0, R, d, device=torch.device("cpu"), **kwargs):
    """
    Key function for the entire problem: given parameters generate the model
    """
    isotropic = hardening.VoceIsotropicHardeningModel(
        CP(
            R,
            scaling=optimize.bounded_scale_function(
                (
                    torch.tensor(R_true * (1 - sf), device=device),
                    torch.tensor(R_true * (1 + sf), device=device),
                )
            ),
        ),
        CP(
            d,
            scaling=optimize.bounded_scale_function(
                (
                    torch.tensor(d_true * (1 - sf), device=device),
                    torch.tensor(d_true * (1 + sf), device=device),
                )
            ),
        ),
    )
    kinematic = hardening.NoKinematicHardeningModel()
    flowrule = flowrules.IsoKinViscoplasticity(
        CP(
            n,
            scaling=optimize.bounded_scale_function(
                (
                    torch.tensor(n_true * (1 - sf), device=device),
                    torch.tensor(n_true * (1 + sf), device=device),
                )
            ),
        ),
        CP(
            eta,
            scaling=optimize.bounded_scale_function(
                (
                    torch.tensor(eta_true * (1 - sf), device=device),
                    torch.tensor(eta_true * (1 + sf), device=device),
                )
            ),
        ),
        CP(
            s0,
            scaling=optimize.bounded_scale_function(
                (
                    torch.tensor(s0_true * (1 - sf), device=device),
                    torch.tensor(s0_true * (1 + sf), device=device),
                )
            ),
        ),
        isotropic,
        kinematic,
    )
    model = models.InelasticModel(
        CP(
            E,
            scaling=optimize.bounded_scale_function(
                (
                    torch.tensor(E_true * (1 - sf), device=device),
                    torch.tensor(E_true * (1 + sf), device=device),
                )
            ),
        ),
        flowrule,
    )

    return models.ModelIntegrator(model, **kwargs)


def generate_input(erates, emax, ntime):
    """
    Generate the times and strains given the strain rates, maximum strain, and number of time steps
    """
    strain = torch.repeat_interleave(
        torch.linspace(0, emax, ntime, device=device)[None, :], len(erates), 0
    ).T.to(device)
    time = strain / erates

    return time, strain


def downsample(rawdata, nkeep, nrates, nsamples):
    """
    Return fewer than the whole number of samples for each strain rate
    """
    ntime = rawdata[0].shape[1]
    return tuple(
        data.reshape(data.shape[:-1] + (nrates, nsamples))[..., :nkeep].reshape(
            data.shape[:-1] + (-1,)
        )
        for data in rawdata
    )


if __name__ == "__main__":
    # Running this script will regenerate the data
    ntime = 200
    emax = 0.5
    erates = torch.logspace(-2, -8, 7, device=device)
    nrates = len(erates)
    nsamples = 50

    scales = [0.0, 0.01, 0.05, 0.1, 0.15]

    times, strains = generate_input(erates, emax, ntime)

    for scale in scales:
        print("Generating data for scale = %3.2f" % scale)
        full_times = torch.empty((ntime, nrates, nsamples), device=device)
        full_strains = torch.empty_like(full_times)
        full_stresses = torch.empty_like(full_times)
        full_temperatures = torch.zeros_like(full_strains)

        for i in tqdm.tqdm(range(nsamples)):
            full_times[:, :, i] = times
            full_strains[:, :, i] = strains

            # True values are 0.5 with our scaling so this is easy
            model = make_model(
                torch.tensor(0.5, device=device),
                torch.tensor(ra.normal(0.5, scale), device=device),
                torch.tensor(ra.normal(0.5, scale), device=device),
                torch.tensor(ra.normal(0.5, scale), device=device),
                torch.tensor(ra.normal(0.5, scale), device=device),
                torch.tensor(ra.normal(0.5, scale), device=device),
            )

            with torch.no_grad():
                full_stresses[:, :, i] = model.solve_strain(
                    times, strains, full_temperatures[:, :, i]
                )[:, :, 0]

        full_cycles = torch.zeros_like(full_times, dtype=int, device=device)
        types = np.array(["tensile"] * (nsamples * len(erates)))
        controls = np.array(["strain"] * (nsamples * len(erates)))

        ds = xr.Dataset(
            {
                "time": (["ntime", "nexp"], full_times.flatten(-2, -1).cpu().numpy()),
                "strain": (
                    ["ntime", "nexp"],
                    full_strains.flatten(-2, -1).cpu().numpy(),
                ),
                "stress": (
                    ["ntime", "nexp"],
                    full_stresses.flatten(-2, -1).cpu().numpy(),
                ),
                "temperature": (
                    ["ntime", "nexp"],
                    full_temperatures.cpu().flatten(-2, -1).numpy(),
                ),
                "cycle": (["ntime", "nexp"], full_cycles.flatten(-2, -1).cpu().numpy()),
                "type": (["nexp"], types),
                "control": (["nexp"], controls),
            },
            attrs={"scale": scale, "nrates": nrates, "nsamples": nsamples},
        )

        ds.to_netcdf("scale-%3.2f.nc" % scale)

The example in examples/structural-inference/creep-fatigue/deterministic repeats a similar example using synthetic cyclic (creep-fatigue) test data and a pyoptmat.hardening.ChabocheHardeningModel viscoplastic material model.

Tutorial example for cyclic load

examples/structural-inference/creep-fatigue/statistical basically repeats the tutorial, but using synthetic cyclic test data a pyoptmat.hardening.ChabocheHardeningModel viscoplastic model capable of capturing kinematic hardening behavior.

Neural ODE example

Inferring the response of a coupled mass-spring-damper system using trajectory data (1) a physical ODE model and (2) a neural ODE model.

#!/usr/bin/env python3

"""
Sample problem trying to infer the response of a chain of serially
connected mass-spring-dashpot 1D elements, fixed at one end of the
chain and subject to a force on the other end.

The number of elements in the chain is `n_chain`.

This example
1. Generates `n_samples` trajectories under a random force described by
   ...  We separate out only the end displacements as observable,
   all the interior displacements are treated as hidden state.
2. Try to infer the mass/spring/dashpot constants and the system response
   using the analytical ODE as a model, given the end-displacement
   trajectories as input.
3. Again try to infer the response from the end displacement trajectories
   but now using a neural ODE as the model.

The integration will use `n_time` points in the integration.
"""

import itertools

import torch
from torch import nn

from functorch import vmap, jacrev

import tqdm
import matplotlib.pyplot as plt

import sys
sys.path.append("../..")
from pyoptmat import ode, utility, neuralode

if torch.cuda.is_available():
    dev = "cuda:0"
else:
    dev = "cpu"
device = torch.device(dev)

# I'm assuming single precision will be fine for this but for now use doubles
torch.set_default_tensor_type(torch.DoubleTensor)

class MassDamperSpring(torch.nn.Module):
    """
    Mass, spring, damper system of arbitrary size

    I use simple substitution to solve for the velocity and displacement
    in a first order system, so the size of the system is twice the size of the
    number of elements.

    Args:
        K (torch.tensor): (n_chain,) vector of stiffnesses
        C (torch.tensor): (n_chain,) vector of dampers
        M (torch.tensor): (n_chain,) vector of masses
        force (function): gives f(t)
    """
    def __init__(self, K, C, M, force, C_scale = 1.0e-4, M_scale = 1.0e-5):
        super().__init__()

        self.K = torch.nn.Parameter(K)
        self.C = torch.nn.Parameter(C)
        self.M = torch.nn.Parameter(M)
        self.force = force

        self.C_scale = C_scale
        self.M_scale = M_scale
        
        self.half_size = K.shape[0]
        self.size = 2 * self.half_size

    def forward(self, t, y):
        """
        Return the state rate

        Args:
            t (torch.tensor): (nchunk, nbatch) tensor of times
            y (torch.tensor): (nchunk, nbatch, size) tensor of state

        Returns:
            y_dot: (nchunk, nbatch, size) tensor of state rates
            J:     (nchunk, nbatch, size, size) tensor of Jacobians
        """
        ydot = torch.zeros_like(y)
        J = torch.zeros(y.shape + (self.size,), device = t.device)

        # Separate out for convenience
        u = y[...,:self.half_size]
        v = y[...,self.half_size:]
        
        # Differences
        du = torch.diff(u, dim = -1)
        dv = torch.diff(v, dim = -1)

        # Scaled properties
        M = self.M * self.M_scale
        C = self.C * self.C_scale
        
        # Rate
        # Velocity
        ydot[...,:self.half_size] = v
        
        # Springs
        ydot[...,self.half_size:-1] += self.K[...,:-1] * du / M[...,:-1]
        ydot[...,self.half_size+1:] += -self.K[...,:-1] * du / M[...,1:]
        ydot[...,-1] += -self.K[...,-1] * u[...,-1] / M[...,-1]
        ydot[...,self.half_size] += self.force(t) / M[...,0]

        # Dampers
        ydot[...,self.half_size:-1] += C[...,:-1] * dv / M[...,:-1]
        ydot[...,self.half_size+1:] += -C[...,:-1] * dv / M[...,1:]
        ydot[...,-1] += -C[...,-1] * v[...,-1] / M[...,-1]

        # Derivative
        # Velocity 
        J[...,:self.half_size,self.half_size:] += torch.eye(self.half_size, device = t.device)

        # Springs
        J[...,self.half_size:,:self.half_size] += torch.diag_embed(-self.K / M)
        J[...,self.half_size+1:,1:self.half_size] += torch.diag_embed(-self.K[...,:-1] / M[...,1:])
        J[...,self.half_size:,:self.half_size] += torch.diag_embed(self.K[...,:-1] / M[...,:-1], offset = 1)
        J[...,self.half_size:,:self.half_size] += torch.diag_embed(self.K[...,:-1] / M[...,1:], offset = -1)

        # Dampers
        J[...,self.half_size:,self.half_size:] += torch.diag_embed(-C / M)
        J[...,self.half_size+1:,self.half_size+1:] += torch.diag_embed(-C[...,:-1] / M[...,1:])
        J[...,self.half_size:,self.half_size:] += torch.diag_embed(C[...,:-1] / M[...,:-1], offset = 1)
        J[...,self.half_size:,self.half_size:] += torch.diag_embed(C[...,:-1] / M[...,1:], offset = -1)

        return ydot, J

    def initial_condition(self, nsamples):
        """
        I don't want to learn the initial condition (we could) 
        so just return deterministic zeros
        """
        return torch.zeros(nsamples, self.size, device = device)

def neural_ode_factory(n_hidden, n_layers, n_inter):
    """Make a neural ODE to try
    
    Args:
        n_hidden (int): number of hidden state variables
        n_layers (int): network depth
        n_inter (int): size of hidden layers
    """
    n_in = n_hidden + 2
    n_out = n_hidden + 1
    n_inter += 1

    mods = [nn.Linear(n_in, n_inter), nn.ReLU()
            ] + list(itertools.chain(*[[nn.Linear(n_inter, n_inter), nn.ReLU()] for i in range(n_layers)])
                    ) + [nn.Linear(n_inter, n_out)]

    return nn.Sequential(*mods)

def random_walk(time, mean, scale, mag):
    """
    Simulate a random walk with velocity of the given mean and scale
    """
    results = torch.zeros_like(time)
    for i in range(1, time.shape[0]):
        dt = time[i] - time[i-1]
        v = mag*torch.normal(mean, scale)
        results[i] = results[i-1] + v * dt

    return results

if __name__ == "__main__":
    # Basic parameters
    n_chain = 5     # Number of spring-dashpot-mass elements
    n_samples = 5   # Number of random force samples
    n_time = 512+1    # Number of time steps
    integration_method = 'backward-euler'
    direct_solver = "thomas" # Batched, block, bidiagonal direct solver method

    # Time chunking -- best value may vary on your system
    n_chunk = 2**7

    # Ending time
    t_end = 1.0

    # Mean and standard deviation for random force "velocity"
    force_mean = torch.tensor(0.0, device = device)
    force_std = torch.tensor(1.0, device = device)
    force_mag = torch.tensor(1.0, device = device)

    # True values of the mass, damping, and stiffness
    K = torch.rand(n_chain, device = device)
    C = torch.rand(n_chain, device = device)
    M = torch.rand(n_chain, device = device)

    # Time values
    time = torch.linspace(0, t_end, n_time, device = device).unsqueeze(-1).expand(n_time, n_samples)

    # 1. Generate the data

    # Generate the force values
    force = random_walk(time, force_mean.unsqueeze(0).expand(n_samples), force_std.unsqueeze(0).expand(n_samples), force_mag.unsqueeze(0).expand(n_samples))

    # Plot them
    plt.figure()
    plt.plot(time.cpu().numpy(), force.cpu().numpy())
    plt.show()

    # Interpolate in time
    force_continuous = utility.ArbitraryBatchTimeSeriesInterpolator(time, force)

    # Setup the ground truth model
    model = MassDamperSpring(K, C, M, force_continuous)

    # Generate the initial condition
    y0 = model.initial_condition(n_samples)

    # Generate the data
    with torch.no_grad():
        y_data = ode.odeint(model, y0, time, method = integration_method, 
                block_size = n_chunk,
                direct_solve_method = direct_solver)

    # The observations will just be the first entry
    observable = y_data[...,0]
    
    # Plot them
    plt.figure()
    plt.plot(time.cpu().numpy(), observable.cpu().numpy())
    plt.show()
    
    # 2. Infer with the actual model
    ode_model = MassDamperSpring(torch.rand(n_chain, device = device),
            torch.rand(n_chain, device = device),
            torch.rand(n_chain, device = device), force_continuous)
    
    # Training parameters...
    niter = 1000
    lr = 1.0e-3
    loss = torch.nn.MSELoss(reduction = "sum")

    # Optimization setup
    optim = torch.optim.Adam(ode_model.parameters(), lr)
    def closure():
        optim.zero_grad()
        pred = ode.odeint_adjoint(ode_model, y0, time, 
                method = integration_method, 
                block_size = n_chunk,
                direct_solve_method = direct_solver)
        obs = pred[...,0]
        lossv = loss(obs, observable)
        lossv.backward()
        return lossv

    # Optimization loop
    t = tqdm.tqdm(range(niter), total = niter, desc = "Loss:     ")
    loss_history = []
    for i in t:
        closs = optim.step(closure)
        loss_history.append(closs.detach().cpu().numpy())
        t.set_description("Loss: %3.2e" % loss_history[-1])

    # Plot the loss history
    plt.figure()
    plt.plot(loss_history)
    plt.show()

    # Make the final predictions
    with torch.no_grad():
        ode_preds = ode.odeint(ode_model, y0, time, method = integration_method, block_size = n_chunk)
        ode_obs = ode_preds[...,0]

    # Make a plot
    plt.figure()
    for i in range(n_samples):
        l, = plt.plot(time.cpu().numpy()[:,i], observable.cpu().numpy()[:,i])
        plt.plot(time.cpu().numpy()[:,i], ode_obs.cpu().numpy()[:,i], ls = '--', color = l.get_color())
    plt.show()

    # 2. Infer with a neural ODE
    # Setup the model
    n_hidden = n_chain # I don't know, seems reasonable
    n_layers = 3
    n_inter = n_chain # Same thing, seems reasonable
    
    nn = neural_ode_factory(n_hidden, n_layers, n_inter).to(device) 
    y0 = torch.zeros(n_samples, n_hidden+1, device = device)

    nn_model = neuralode.NeuralODE(nn, lambda t: force_continuous(t).unsqueeze(-1)) 

    # Training parameters
    niter = 1000
    lr = 1.0e-3
    loss = torch.nn.MSELoss(reduction = "sum")
    
    # Optimization setup
    optim = torch.optim.Adam(nn_model.parameters(), lr)
    def closure():
        optim.zero_grad()
        pred = ode.odeint_adjoint(nn_model, y0, time, method = integration_method, block_size = n_chunk)
        obs = pred[...,0]
        lossv = loss(obs, observable)
        lossv.backward()
        return lossv

    # Optimization loop
    t = tqdm.tqdm(range(niter), total = niter, desc = "Loss:     ")
    loss_history = []
    for i in t:
        closs = optim.step(closure)
        loss_history.append(closs.detach().cpu().numpy())
        t.set_description("Loss: %3.2e" % loss_history[-1])

    # Plot the loss history
    plt.figure()
    plt.plot(loss_history)
    plt.show()

    # Make the final predictions
    with torch.no_grad():
        nn_preds = ode.odeint(nn_model, y0, time, method = integration_method, block_size = n_chunk)
        nn_obs = nn_preds[...,0]

    # Make a plot
    plt.figure()
    for i in range(n_samples):
        l, = plt.plot(time.cpu().numpy()[:,i], observable.cpu().numpy()[:,i])
        plt.plot(time.cpu().numpy()[:,i], nn_obs.cpu().numpy()[:,i], ls = '--', color = l.get_color())
    plt.show()
    
    # Compare all three
    plt.figure()
    for i in range(n_samples):
        l, = plt.plot(time.cpu().numpy()[:,i], observable.cpu().numpy()[:,i])
        plt.plot(time.cpu().numpy()[:,i], ode_obs.cpu().numpy()[:,i], ls = '--', color = l.get_color())
        plt.plot(time.cpu().numpy()[:,i], nn_obs.cpu().numpy()[:,i], ls = ':', color = l.get_color())
    plt.show()

Implicit versus explicit integration

The example in examples/ode/vanderpohl.py demonstrates the need for an implicit integration scheme when integrating stiff systems of ODEs, including structural constitutive models. The example uses the ode package to integrate the classic van der Pol equation with different parameters to demonstrate the need for implicit time integration as the equations become stiff. The results of the calculations are differentiable using either direct torch AD or the adjoint methods.

#!/usr/bin/env python3

"""
  This example demonstrate the ability of the backward Euler integration
  option to accurately integrate a very stiff system of ODEs.  The example
  system used here is the Van der Pol system from Manichev et al. 2019.

  .. math::

    \\dot{y}_0 = y_1
    \\dot{y}_1 = -y_0 + \\mu \\left(1 - y_0^2 \\right) y_1

  with initial conditions

  .. math::

    y_0(0) = -1
    y_1(0) = 1

  and :math:`t \\in [0,4.2\\mu].

  This system of ODEs is parameterized by :math:`\\mu`.  For
  :math:`mu = 1.0` the system is not stiff and can be integrated by
  either forward or backward methods.  For :math:`\\mu=14` the equations
  begin to become stiff and the forward Euler method becomes inaccurate.
  For :math:`\\mu=30` the equations are very stiff and can only
  be integrated with the backward Euler method.
"""

import sys

sys.path.append("../..")

import numpy as np
import torch

import matplotlib.pyplot as plt

from pyoptmat import ode

torch.set_default_tensor_type(torch.DoubleTensor)


class VanderPolODE(torch.nn.Module):
    """
    From Manichev et al 2019

      x0 = [-1, 1]
      t = [0, 4.2*mu)
    """

    def __init__(self, mu):
        super().__init__()
        self.mu = torch.tensor(mu)

    def forward(self, t, y):
        f = torch.empty(y.shape)
        f[..., 0] = y[..., 1]
        f[..., 1] = -y[..., 0] + self.mu * (1.0 - y[..., 0] ** 2.0) * y[..., 1]

        df = torch.empty(y.shape + y.shape[-1:])
        df[..., 0, 0] = 0
        df[..., 0, 1] = 1
        df[..., 1, 0] = -1 - 2.0 * self.mu * y[..., 0] * y[..., 1]
        df[..., 1, 1] = self.mu * (1.0 - y[..., 0] ** 2.0)

        return f, df


if __name__ == "__main__":
    # Common inputs
    y0 = torch.tensor([[-1.0, 1]])
    period = lambda m: (3.0 - 2 * np.log(2)) * m + 2.0 * np.pi / mu ** (1.0 / 3)

    # Number of vectorized time steps
    time_chunk = 10

    # Test for non-stiff version
    mu = 1.0
    times = torch.linspace(0, period(mu) * 10, 10000).unsqueeze(-1)

    model = VanderPolODE(mu)

    res_exp = ode.odeint(model, y0, times, method="forward-euler",
            block_size = time_chunk, guess_type = "previous")
    res_imp = ode.odeint(
        model, y0, times, method="backward-euler",
        block_size = time_chunk, guess_type = "previous"
    )

    plt.figure()
    plt.plot(times, res_exp[:, 0, 0], label="Forward")
    plt.plot(times, res_imp[:, 0, 0], label="Backward")
    plt.xlabel("time")
    plt.ylabel(r"$y_0$")
    plt.legend(loc="best")
    plt.title("Not stiff")
    plt.show()

    # Test for moderately stiff version
    mu = 14.0
    times = torch.linspace(0, period(mu) * 10, 10000).unsqueeze(-1)

    model = VanderPolODE(mu)

    res_exp = ode.odeint(model, y0, times, method="forward-euler",
            block_size = time_chunk, guess_type = "previous")
    res_imp = ode.odeint(
        model, y0, times, method="backward-euler",
        block_size = time_chunk, guess_type = "previous"
    )

    plt.figure()
    plt.plot(times, res_exp[:, 0, 0], label="Forward")
    plt.plot(times, res_imp[:, 0, 0], label="Backward")
    plt.legend(loc="best")
    plt.xlabel("time")
    plt.ylabel(r"$y_0$")
    plt.title("Moderately stiff")
    plt.show()

    # Test for highly stiff version (explicit just explodes)
    mu = 30.0
    times = torch.linspace(0, period(mu) * 10 * 10.0 / 16, 20000).unsqueeze(-1)

    model = VanderPolODE(mu)

    res_imp = ode.odeint(
        model, y0, times, method="backward-euler",
        block_size = time_chunk, guess_type = "previous"
    )

    plt.figure()
    plt.plot(times, res_imp[:, 0, 0], label="Backward")
    plt.legend(loc="best")
    plt.xlabel("time")
    plt.ylabel(r"$y_0$")
    plt.title("Very stiff")
    plt.show()

Statistical inference with a simple ODE

This example (examples/ode/trajectory.py) demonstrates the basic application of pyoptmat to find parameter distributions for systems of stochastic ODEs that match uncertain data. This example demonstrates these concepts using trajectories from a canon fired at a random angle and speed, drawn from known distributions. The example tries to infer the distributions of angle and speed given some number of observed trajectories. This problem is small enough that you can run it using either the adjoint method or AD to calculate the sensitivities, to compare the relative performance of both approaches.

#!/usr/bin/env python3

"""
  This example demonstrates using the basic capabilities of pyopmat to
  find parameter distributions for a system of ODEs to match variable
  measured data.  The particular example here is a cannon firing with a
  random velocity, parameterized here with a random speed and launch
  angle.  The observed "data" is the resulting trajectory of the cannon ball,
  starting from the point of launch and ending when it hits the ground.
  In this case, the x-coordinate represents the distance from the cannon
  and the y-coordinate the height above the initial launch point.

  This example further applies some additional white noise on top of the
  trajectory "measurements" representing random experimental error in
  measuring or recording the data.

  The goal of the variational inference is to recover the statistical
  distribution of launch speeds and angles by observing some number of
  trajectories.  The example also tries to recover the scale of the
  random, white noise superimposed on top of the measured trajectories.
  The example, as setup below, observes 50 random trajectories
  and uses pyro's SVI algorithm to infer the angle and speed distributions.
  The example plots the results and compares the inferred distributions to
  the known actual posteriors.
"""

import matplotlib.pyplot as plt

import torch
import pyro
from pyro.nn import PyroSample
import pyro.distributions as dist
from pyro.infer import SVI, Trace_ELBO, Predictive, JitTrace_ELBO
import pyro.optim as optim
import pyro.distributions.constraints as constraints

from tqdm import tqdm

import sys

sys.path.append("../..")
from pyoptmat import ode

# Gravity
g = torch.tensor(0.1)

# Actual location and scale of the speed and launch angle
# True posteriors are normal distributions
v_loc_act = 2.0
v_scale_act = 0.03
a_loc_act = 0.7
a_scale_act = 0.04

# Prior normal distribution for the speed and launch angle
v_loc_prior = 1.5
v_scale_prior = 0.1
a_loc_prior = 0.3
a_scale_prior = 0.1

# White noise applied to the observations
eps_act = 0.05

# Prior for the noise
eps_prior = 0.1  # Just measure variance in data...

def model_act(times):
    """
    times: ntime x nbatch
    trajectories: ntime x nbatch x 2
    """
    v = pyro.sample("v", dist.Normal(v_loc_act, v_scale_act))
    a = pyro.sample("a", dist.Normal(a_loc_act, a_scale_act))

    simulated = pyro.sample(
        "data",
        dist.Normal(
            torch.stack(
                (
                    v * torch.cos(a) * times,
                    v * torch.sin(a) * times - 0.5 * g * times ** 2.0,
                )
            ).T,
            eps_act,
        ),
    )

    return simulated


class Integrator(pyro.nn.PyroModule):
    def __init__(self, eqn, y0, extra_params=[], block_size = 1):
        super().__init__()
        self.eqn = eqn
        self.y0 = y0
        self.extra_params = extra_params
        self.block_size = block_size

    def forward(self, times):
        return ode.odeint_adjoint(
            self.eqn,
            self.y0,
            times,
            block_size = self.block_size,
            extra_params=self.extra_params,
        )


class ODE(pyro.nn.PyroModule):
    def __init__(self, v, a):
        super().__init__()
        self.v = v
        self.a = a

    def forward(self, t, y):
        f = torch.empty(y.shape)

        # Acceleration
        f[..., 0] = self.v * torch.cos(self.a)
        f[..., 1] = self.v * torch.sin(self.a) - g * t

        # Nice ODE lol
        df = torch.zeros(y.shape + y.shape[-1:])

        return f, df


class Model(pyro.nn.PyroModule):
    def __init__(
        self,
        maker,
        names,
        loc_priors,
        scale_priors,
        loc_suffix="_loc",
        scale_suffix="_scale",
        param_suffix="_param",
        bot_suffix="_bot",
    ):
        super().__init__()

        self.maker = maker

        self.loc_suffix = loc_suffix
        self.scale_suffix = scale_suffix
        self.param_suffix = param_suffix
        self.bot_suffix = bot_suffix
        self.loc_priors = loc_priors
        self.scale_priors = scale_priors
        self.eps_prior = eps_prior
        self.names = names

        # Setup both levels of distributions
        self.bot_vars = names
        self.top_vars = []
        for var, loc, scale in zip(names, loc_priors, scale_priors):
            setattr(self, var + loc_suffix, PyroSample(dist.Normal(loc, scale)))
            self.top_vars.append(var + loc_suffix)
            setattr(self, var + scale_suffix, PyroSample(dist.LogNormal(scale, 1)))
            self.top_vars.append(var + scale_suffix)
            setattr(
                self,
                var,
                PyroSample(
                    lambda self, var=var: dist.Normal(
                        getattr(self, var + loc_suffix),
                        getattr(self, var + scale_suffix),
                    )
                ),
            )

        # Setup noise
        self.eps = PyroSample(dist.LogNormal(eps_prior, 1))

        self.extra_param_names = []

    def forward(self, times, actual=None):
        y0 = torch.zeros((times.shape[1],) + (2,))

        curr = self.sample_top()
        eps = self.eps

        with pyro.plate("trials", times.shape[1]):
            bmodel = self.maker(*self.sample_bot(), extra_params=self.gen_extra())
            simulated = bmodel(times)
            with pyro.plate("time", times.shape[0]):
                pyro.sample("obs", dist.Normal(simulated, eps).to_event(1), obs=actual)

        return simulated

    def sample_top(self):
        return [getattr(self, name) for name in self.top_vars]

    def sample_bot(self):
        return [getattr(self, name) for name in self.bot_vars]

    def make_guide(self):
        def guide(times, actual=None):
            top_loc_samples = []
            top_scale_samples = []

            for name, loc, scale in zip(self.names, self.loc_priors, self.scale_priors):
                loc_param = pyro.param(
                    name + self.loc_suffix + self.param_suffix, torch.tensor(loc)
                )
                scale_param = pyro.param(
                    name + self.scale_suffix + self.param_suffix,
                    torch.tensor(scale),
                    constraint=constraints.positive,
                )

                top_loc_samples.append(
                    pyro.sample(name + self.loc_suffix, dist.Delta(loc_param))
                )
                top_scale_samples.append(
                    pyro.sample(name + self.scale_suffix, dist.Delta(scale_param))
                )

            eps_param = pyro.param(
                "eps" + self.param_suffix,
                torch.tensor(self.eps_prior),
                constraint=constraints.positive,
            )
            eps_sample = pyro.sample("eps", dist.Delta(eps_param))

            with pyro.plate("trials", times.shape[1]):
                for name, loc_sample, scale_sample, loc_prior, scale_prior in zip(
                    self.names,
                    top_loc_samples,
                    top_scale_samples,
                    self.loc_priors,
                    self.scale_priors,
                ):
                    ll_param = pyro.param(
                        name + self.param_suffix,
                        torch.ones(times.shape[1]) * torch.tensor(loc_prior),
                    )
                    pyro.sample(name, dist.Delta(ll_param))

        self.extra_param_names = [var + self.param_suffix for var in self.names]

        return guide

    def gen_extra(self):
        return [pyro.param(name).unconstrained() for name in self.extra_param_names]


if __name__ == "__main__":
    # Number of samples to provide
    nsamples = 50

    # Maximum and number of time steps
    tmax = 20.0
    tnum = 100

    # Number of vectorized time steps to evaluate at once
    time_block = 50

    time = torch.linspace(0, tmax, tnum)
    times = torch.empty(tnum, nsamples)
    data = torch.empty(tnum, nsamples, 2)

    with torch.no_grad():
        for i in range(nsamples):
            times[:, i] = time
            data[:, i] = model_act(time)

    plt.figure()
    plt.plot(data[:, :, 0], data[:, :, 1])
    plt.xlabel("x-coordinate")
    plt.ylabel("y-coordinate")
    plt.title("Trajectory data")
    plt.show()

    pyro.clear_param_store()

    def maker(v, a, **kwargs):
        return Integrator(ODE(v, a), torch.zeros(nsamples, 2),
                block_size = time_block, **kwargs)

    # Setup the model
    model = Model(
        maker, ["v", "a"], [v_loc_prior, a_loc_prior], [v_scale_prior, a_scale_prior]
    )

    # Optimization hyperparameters: learning rate, number of iterations, and
    # number of samples for calculating the ELBO
    lr = 5.0e-3
    niter = 250
    num_samples = 1

    guide = model.make_guide()

    # Init guide
    guide(times)

    optimizer = optim.ClippedAdam({"lr": lr})
    l = Trace_ELBO(num_particles=num_samples)

    svi = SVI(model, guide, optimizer, loss=l)

    t = tqdm(range(niter))
    loss_hist = []
    for i in t:
        loss = svi.step(times, data)
        loss_hist.append(loss)
        t.set_description("Loss: %3.2e" % loss)

    print("Inferred distributions:")
    print(
        "Velocity mean: %4.3f, actual %4.3f"
        % (pyro.param("v_loc_param").data, v_loc_act)
    )
    print(
        "Velocity scale: %4.3f, actual %4.3f"
        % (pyro.param("v_scale_param").data, v_scale_act)
    )
    print(
        "Angle mean: %4.3f, actual %4.3f" % (pyro.param("a_loc_param").data, a_loc_act)
    )
    print(
        "Angle scale: %4.3f, actual %4.3f"
        % (pyro.param("a_scale_param").data, a_scale_act)
    )
    print("White noise: %4.3f, actual %4.3f" % (pyro.param("eps_param").data, eps_act))

    plt.figure()
    plt.plot(loss_hist)
    plt.xlabel("Iteration")
    plt.ylabel("Loss")
    plt.title("Convergence diagram")
    plt.show()

    print("")

    nsample = 1
    predict = Predictive(model, guide=guide, num_samples=nsample, return_sites=("obs",))
    with torch.no_grad():
        samples = predict(times)["obs"]
        min_x, _ = torch.min(samples[0, :, :, 0], 1)
        max_x, _ = torch.max(samples[0, :, :, 0], 1)
        min_y, _ = torch.min(samples[0, :, :, 1], 1)
        max_y, _ = torch.max(samples[0, :, :, 1], 1)

    plt.figure()
    plt.plot(times, data[:, :, 1], "k-", lw=0.5)
    plt.fill_between(time, min_y, max_y, alpha=0.75)
    plt.xlabel("Time")
    plt.ylabel("y coordinate")
    plt.title("Height versus time")
    plt.show()

    plt.figure()
    plt.plot(times, data[:, :, 0], "k-", lw=0.5)
    plt.fill_between(time, min_x, max_x, alpha=0.75)
    plt.xlabel("Time")
    plt.ylabel("x coordinate")
    plt.title("Distance versus time")
    plt.show()

Hogdkin-Huxley coupled neuron model

An example, scalable system of ODEs for benchmarking.

#!/usr/bin/env python

"""
    This example integrates a more complicated system of ODEs:
    a model of coupled neurons responding to a cyclic 
    electrical current (the Hodgkin and Huxley model, as
    described by :cite:`schwemmer2012theory`).

    The number of neurons, current cycles, time steps per 
    cycle, vectorized time steps, etc. can be customized.
    This is a decent benchmark problem for examining the performance
    of pyoptmat on your machine.
"""

import sys
sys.path.append('../..')

import torch
import torch.nn as nn

import matplotlib.pyplot as plt

from pyoptmat import ode, experiments, utility
import time

# Use doubles
torch.set_default_tensor_type(torch.DoubleTensor)

# Select device to run on
if torch.cuda.is_available():
    dev = "cuda:0"
else:
    dev = "cpu"
device = torch.device(dev)

def driving_current(I_max, R, rate, thold, chold, Ncycles, nload,
        nhold, neq):
    """
        Setup cyclic current histories

        Args:
            I_max (tuple): (min,max) uniform distribution of max currents
            R (tuple): (min,max) uniform distribution of load ratios
            rate (tuple): (min, max) uniform distribution of current rates
            thold (tuple): (min, max) uniform distribution of hold at max current times
            chold (tuple): (min, max) uniform distribution of hold at min current times
            Ncycles (int): number of load cycles
            nload (int): number of time steps for current ramp
            nhold (int): number of time steps for current holds
            neq (int): number of independent current histories
    """
    Ntotal = (4*nload + 2*nhold) * Ncycles + 1
    times = torch.empty((Ntotal,neq))
    I = torch.empty((Ntotal,neq))
    for i in range(neq):
        cycle = experiments.generate_random_cycle(
                I_max, R, rate, thold, chold)
        ti, Ii, ci = experiments.sample_cycle_normalized_times(
                cycle, Ncycles, nload, nhold)
        times[:,i] = torch.tensor(ti)
        I[:,i] = torch.tensor(Ii)
    
    return utility.ArbitraryBatchTimeSeriesInterpolator(
            times.to(device), I.to(device)), times.to(device), I.to(device)

class HodgkinHuxleyCoupledNeurons(torch.nn.Module):
    """
        As described by :cite:`schwemmer2012theory`

        .. math::

            F(y) = \\begin{bmatrix} F_1(y_1)
            \\\\ F_2(y_2)
            \\\\ \\vdots
            \\\\ F_n(y_n) 
            \\end{bmatrix}

        with

        .. math::

            y = \\begin{bmatrix} y_1
            \\\ y_2
            \\\ \\vdots
            \\\ y_n
            \\end{bmatrix}

        and

        .. math::

            F_i(y_i) = \\begin{bmatrix} V_i
            \\\\ m_i
            \\\\ h_i
            \\\\ n_i
            \\end{bmatrix}

        and

        .. math::

            F_i(y_i) = \\begin{bmatrix} \\frac{1}{C_i}(-g_{Na,i}m_i^3h_i(V_i-E_{Na,i})-g_{K,i}n_i^4(V_i-E_{k,i})-g_{L,i}(V_i-E_{L,i})+I(t) + \\Delta V_{ij})
            \\\\ \\frac{m_{\\infty,i}-m_i}{\\tau_{m,i}}
            \\\\ \\frac{h_{\\infty,i}-h_i}{\\tau_{h,i}}
            \\\\ \\frac{n_{\\infty,i}-n_i}{\\tau_{n,i}}
            \\end{bmatrix}

        with

        .. math::

            \\Delta V_{ij} = \\sum_{j=1}^{n_{neurons}}\\frac{g_{C,i}(V_i-V_j)}{C_i}

    """
    def __init__(self, C, g_Na, E_Na, g_K, E_K, g_L, E_L, m_inf,
            tau_m, h_inf, tau_h, n_inf, tau_n, g_C, I):
        super().__init__()
        self.C = nn.Parameter(C)
        self.g_Na = nn.Parameter(g_Na)
        self.E_Na = nn.Parameter(E_Na)
        self.g_K = nn.Parameter(g_K)
        self.E_K = nn.Parameter(E_K)
        self.g_L = nn.Parameter(g_L)
        self.E_L = nn.Parameter(E_L)
        self.m_inf = nn.Parameter(m_inf)
        self.tau_m = nn.Parameter(tau_m)
        self.h_inf = nn.Parameter(h_inf)
        self.tau_h = nn.Parameter(tau_h)
        self.n_inf = nn.Parameter(n_inf)
        self.tau_n = nn.Parameter(tau_n)
        self.g_C = nn.Parameter(g_C)

        self.I = I

        self.n_neurons = self.g_C.shape[0]
        self.n_equations = self.n_neurons * 4

    def forward(self, t, y):
        """
            Evaluate the system of ODEs at a particular state

            Args:
                t (torch.tensor): current time 
                y (torch.tensor): current state

            Returns:
                y_dot (torch.tensor): current ODEs
                J (torch.tensor): current jacobian
        """
        Ic = self.I(t)

        V = y[...,0::4].clone()
        m = y[...,1::4].clone()
        h = y[...,2::4].clone()
        n = y[...,3::4].clone()

        ydot = torch.zeros_like(y)
        
        # V
        ydot[...,0::4] = 1.0 / self.C[None,...] * (
                -self.g_Na[None,...] * m**3.0 * h * (V - self.E_Na[None,...])
                -self.g_K[None,...] * n**4.0 * (V - self.E_K[None,...])
                -self.g_L[None,...] * (V - self.E_L[None,...]) + Ic[...,None])
        # Coupling term
        dV = torch.sum(self.g_C[None,...]*(V[...,:,None] - V[...,None,:]) / self.C[None,...], dim = -1)
        ydot[...,0::4] += dV

        # m
        ydot[...,1::4] = (self.m_inf - m) / self.tau_m

        # h 
        ydot[...,2::4] = (self.h_inf - h) / self.tau_h

        # n
        ydot[...,3::4] = (self.n_inf - n) / self.tau_n

        J = torch.zeros(y.shape + y.shape[-1:], device = ydot.device)
        
        # V, V
        J[...,0::4,0::4] = torch.diag_embed(1.0 / self.C[None,...] * (
                -self.g_L[None,...] 
                -self.g_Na[None,...]*h*m**3.0
                -self.g_K[None,...]*n**4.0))
        # Coupling term
        J[...,0::4,0::4] -= self.g_C[None,...] / self.C[None,...] 
        
        J[...,0::4,0::4] += torch.eye(self.n_neurons, device = ydot.device).expand(
                *ydot.shape[:-1], -1, -1) * torch.sum(self.g_C / self.C)
        
        # V, m
        J[...,0::4,1::4] = torch.diag_embed(-1.0 / self.C[None,...] * (
                3.0 * self.g_Na[None,...] * h * m**2.0 * (-self.E_Na[None,...] + V)))

        # V, h
        J[...,0::4,2::4] = torch.diag_embed(-1.0 / self.C[None,...] * (
                self.g_Na[None,...] * m**3.0 * (-self.E_Na[None,...] + V)))

        # V, n
        J[...,0::4,3::4] = torch.diag_embed(-1.0 / self.C[None,...] * (
                4.0 * self.g_K[None,...] * n**3.0 * (-self.E_K[None,...] + V)))

        # m, m
        J[...,1::4,1::4] = torch.diag(-1.0 / self.tau_m)

        # h, h 
        J[...,2::4,2::4] = torch.diag(-1.0 / self.tau_h)

        # n, n
        J[...,3::4,3::4] = torch.diag(-1.0 / self.tau_n)
    
        return ydot, J

if __name__ == "__main__":
    # Batch size
    nbatch = 100
    # Time chunk size
    time_block = 50
    # Number of neurons
    neq = 10
    # Number of cycles
    N = 5
    # Number of time steps per cycle
    n = 40
    
    # Setup the driving current
    current, times, discrete_current = driving_current([0.1,1],
            [-1,0.9], 
            [0.5,1.5],
            [0,1],
            [0,1],
            N,
            n,
            n,
            nbatch)

    plt.plot(times.cpu().numpy()[:,::4], discrete_current.cpu().numpy()[:,::4])
    plt.xlabel("Time")
    plt.ylabel("Current")
    plt.show()

    # Setup model and initial conditions
    nsteps = times.shape[0]
    
    model = HodgkinHuxleyCoupledNeurons(
            torch.rand((neq,), device = device),
            torch.rand((neq,), device = device),
            torch.rand((neq,), device = device),
            torch.rand((neq,), device = device),
            torch.rand((neq,), device = device),
            torch.rand((neq,), device = device),
            torch.rand((neq,), device = device),
            torch.rand((neq,), device = device),
            torch.rand((neq,), device = device) * 5.0,
            torch.rand((neq,), device = device),
            torch.rand((neq,), device = device) * 15.0,
            torch.rand((neq,), device = device),
            torch.rand((neq,), device = device) * 10.0,
            torch.rand((neq,), device = device) * 0.01,
            current)

    y0 = torch.rand((nbatch,model.n_equations), device = device)
    

    # Include a backward pass to give a better example of timing
    t1 = time.time()
    res_imp = ode.odeint_adjoint(model, y0, times, block_size = time_block)
    loss = torch.norm(res_imp)
    loss.backward()
    etime = time.time() - t1
    
    print("%i batch size, %i blocked time steps, %i neurons, %i time steps: %f s" % (nbatch, time_block, neq, nsteps, etime))

    plt.plot(times[:,0].detach().cpu().numpy(), res_imp[:,0,0::4].detach().cpu().numpy())
    plt.xlabel("Time")
    plt.ylabel("Voltage")
    plt.show()

Temperature dependent parameters

examples/structural-material-models/temperature_dependence.py illustrates how to setup and run structural material models with temperature dependent parameters.

#!/usr/bin/env python3

"""
  This is a simple demonstration of how pyopmat models can vary with
  temperature.  The example simulates the response of a simple, temperature
  dependent, viscoplastic model for Alloy 617 developed by
  Messner, Phan, and Sham under uniaxial, monotonic tensile strain
  at several different temperatures.
"""

import sys

sys.path.append("../..")

import torch

import matplotlib.pyplot as plt

from pyoptmat import models, flowrules, temperature, experiments

torch.set_default_tensor_type(torch.DoubleTensor)

if __name__ == "__main__":
    # This example illustrates the temperature and rate sensitivity of
    # Alloy 617 with the model given in:
    # Messner, Phan, and Sham. IJPVP 2019.

    # Setup test conditions
    ntemps = 5
    temps = torch.linspace(750, 950, ntemps) + 273.15
    elimits = torch.ones(ntemps) * 0.25
    erates = torch.ones(ntemps) * 8.33e-5

    nsteps = 200

    times, strains, temperatures, cycles = experiments.make_tension_tests(
        erates, temps, elimits, nsteps
    )

    # Perfect viscoplastic model
    A = torch.tensor(-8.679)
    B = torch.tensor(-0.744)
    mu = temperature.PolynomialScaling(
        torch.tensor([-1.34689305e-02, -5.18806776e00, 7.86708330e04])
    )
    b = torch.tensor(2.474e-7)
    k = torch.tensor(1.38064e-20)
    eps0 = torch.tensor(1e10)
    E = temperature.PolynomialScaling(
        torch.tensor([-3.48056033e-02, -1.44398964e01, 2.06464967e05])
    )

    fr = flowrules.PerfectViscoplasticity(
        temperature.KMRateSensitivityScaling(A, mu, b, k),
        temperature.KMViscosityScaling(A, B, mu, eps0, b, k),
    )
    model = models.InelasticModel(E, fr)
    integrator = models.ModelIntegrator(model)

    stresses = integrator.solve_strain(times, strains, temperatures)[:, :, 0]

    print("Temperature scaling")
    for ei, Ti, si in zip(
        strains.T.numpy(), temperatures.T.numpy(), stresses.T.numpy()
    ):
        plt.plot(ei, si, label="T = %3.0fK" % Ti[0])

    plt.legend(loc="best")
    plt.xlabel("Strain (mm/mm)")
    plt.ylabel("Stress (Mpa)")
    plt.show()

Stress and strain control

The example in examples/structural-material-models/stress_strain_control.py demonstrates how to run simulations of structural tests conducted under strain and stress control. pyoptmat can run both types of tests and correctly provide derivatives using the adjoint method

#!/usr/bin/env python3

"""
  This example demonstrates the ability of pyoptmat to integrate material models under
  strain control, stress control, or a combination of both.

  The example first simulates standard, strain-rate controlled tensile curves
  for a simple material model for Alloy 617 developed by Messner, Phan, and Sham.  These
  simulations then take time, strain, and temperature as inputs and outputs the
  resulting stresses.

  Then, the example uses the stresses from the strain controlled simulations
  to repeat the simulations under stress control.  These simulations therefore
  take as input time, temperature, and stress and output strain.  The plot demonstrates that
  the strain controlled and stress controlled simulations produce identical results.

  Finally, the example integrates the model twice for each temperature, once in strain
  control and once in stress control, but integrated all the experiments at once.
  This example then demos the ability of pyoptmat to integrate both stress and strain
  controlled tests in the same vectorized calculation.
"""

import sys

sys.path.append("../..")

import torch

import matplotlib.pyplot as plt

from pyoptmat import models, flowrules, temperature, experiments, hardening

torch.set_default_tensor_type(torch.DoubleTensor)

if __name__ == "__main__":
    # This example illustrates the temperature and rate sensitivity of
    # Alloy 617 with the model given in:
    # Messner, Phan, and Sham. IJPVP 2019.

    # Setup test conditions
    ntemps = 5
    temps = torch.linspace(750, 950, ntemps) + 273.15
    elimits = torch.ones(ntemps) * 0.25
    erates = torch.ones(ntemps) * 8.33e-5

    nsteps = 200

    times, strains, temperatures, cycles = experiments.make_tension_tests(
        erates, temps, elimits, nsteps
    )

    # Perfect viscoplastic model
    A = torch.tensor(-8.679)
    B = torch.tensor(-0.744)
    mu = temperature.PolynomialScaling(
        torch.tensor([-1.34689305e-02, -5.18806776e00, 7.86708330e04])
    )
    b = torch.tensor(2.474e-7)
    k = torch.tensor(1.38064e-20)
    eps0 = torch.tensor(1e10)
    E = temperature.PolynomialScaling(
        torch.tensor([-3.48056033e-02, -1.44398964e01, 2.06464967e05])
    )

    ih = hardening.VoceIsotropicHardeningModel(
        temperature.ConstantParameter(torch.tensor(150.0)),
        temperature.ConstantParameter(torch.tensor(10.0)),
    )
    kh = hardening.NoKinematicHardeningModel()

    fr = flowrules.IsoKinViscoplasticity(
        temperature.KMRateSensitivityScaling(A, mu, b, k),
        temperature.KMViscosityScaling(A, B, mu, eps0, b, k),
        temperature.ConstantParameter(0),
        ih,
        kh,
    )
    model = models.InelasticModel(E, fr)
    integrator = models.ModelIntegrator(model)

    stresses = integrator.solve_strain(times, strains, temperatures)[:, :, 0]

    # Now do it backwards!
    strains_prime = integrator.solve_stress(times, stresses, temperatures)[:, :, 0]

    plt.figure()
    for ei, epi, Ti, si in zip(
        strains.T.numpy(),
        strains_prime.T.numpy(),
        temperatures.T.numpy(),
        stresses.T.numpy(),
    ):
        (l,) = plt.plot(ei, si, label="T = %3.0fK" % Ti[0])
        plt.plot(epi, si, label=None, ls="--", color=l.get_color())

    plt.legend(loc="best")
    plt.xlabel("Strain (mm/mm)")
    plt.ylabel("Stress (Mpa)")
    plt.title("Comparison between strain and stress control")
    plt.show()

    # Now do it both ways at once!
    big_times = torch.cat((times, times), 1)
    big_temperatures = torch.cat((temperatures, temperatures), 1)
    big_data = torch.cat((strains, stresses), 1)
    big_types = torch.zeros(2 * ntemps)
    big_types[ntemps:] = 1

    both_results = integrator.solve_both(
        big_times, big_temperatures, big_data, big_types
    )

    plt.figure()
    for i in range(ntemps):
        (l,) = plt.plot(
            strains[:, i], both_results[:, i, 0], label="T = %3.0fK" % Ti[i]
        )
        plt.plot(
            both_results[:, i + ntemps, 0],
            stresses[:, i],
            label=None,
            ls="--",
            color=l.get_color(),
        )

    plt.xlabel("Strain (mm/mm)")
    plt.ylabel("Stress (MPa)")
    plt.legend(loc="best")
    plt.title("Running both strain and stress control at once")
    plt.show()

Creep tests

examples/structural-material-models/stress_control_creep.py is an example of simulating stress-controlled creep tests, with the idea of using such data to train constitutive models.

#!/usr/bin/env python3

"""
  An example of how to load a model in stress control.  The example
  simulates the behavior of a simple viscoplastic model for the high
  temperature deformation of Alloy 617 developed by Messner, Phan,
  and Sham under creep conditions.  The simulations then load the
  model up to a given stress and then hold the model at that stress
  for a long period of time.  The experimental response is often
  given as a creep curve -- a plot of strain versus time during
  the hold at constant stress.  This example plots a modification, giving
  the total accumulated strain as a function of time, including the
  strain accumulated during the load up to the constant stress.
"""

import sys

sys.path.append("../..")

import torch

import matplotlib.pyplot as plt

from pyoptmat import models, flowrules, temperature, experiments, hardening

torch.set_default_tensor_type(torch.DoubleTensor)

if __name__ == "__main__":
    # This example illustrates the temperature and rate sensitivity of
    # Alloy 617 with the model given in:
    # Messner, Phan, and Sham. IJPVP 2019.

    # Setup test conditions
    target_temperature = 950.0
    target_stresses = torch.linspace(80, 140, 13)
    target_times = torch.ones_like(target_stresses) * 100000 * 3600.0

    nbatch = len(target_stresses)
    nsteps_load = 50
    nsteps_hold = 100
    nsteps = nsteps_load + nsteps_hold

    loading_rate = 1.0

    times, stresses, temperatures, cycles = experiments.make_creep_tests(
        target_stresses,
        torch.ones_like(target_stresses) * target_temperature,
        torch.ones_like(target_stresses) * loading_rate,
        target_times,
        nsteps_load,
        nsteps_hold,
    )

    # Model with hardening
    A = torch.tensor(-8.679)
    B = torch.tensor(-0.744)
    mu = temperature.PolynomialScaling(
        torch.tensor([-1.34689305e-02, -5.18806776e00, 7.86708330e04])
    )
    b = torch.tensor(2.474e-7)
    k = torch.tensor(1.38064e-20)
    eps0 = torch.tensor(1e10)
    E = temperature.PolynomialScaling(
        torch.tensor([-3.48056033e-02, -1.44398964e01, 2.06464967e05])
    )

    ih = hardening.VoceIsotropicHardeningModel(
        temperature.ConstantParameter(torch.tensor(150.0)),
        temperature.ConstantParameter(torch.tensor(10.0)),
    )
    kh = hardening.NoKinematicHardeningModel()

    fr = flowrules.IsoKinViscoplasticity(
        temperature.KMRateSensitivityScaling(A, mu, b, k),
        temperature.KMViscosityScaling(A, B, mu, eps0, b, k),
        temperature.ConstantParameter(0),
        ih,
        kh,
    )
    model = models.InelasticModel(E, fr)
    integrator = models.ModelIntegrator(model)

    strains = integrator.solve_stress(times, stresses, temperatures)[:, :, 0]

    plt.plot(times / 3600.0, strains)
    plt.xlabel("Time (hrs)")
    plt.ylabel("Total strain (mm/mm)")
    plt.show()

AD versus adjoint comparison

examples/performance/ad_vs_adjoint.py compares the efficiency of computing the parameter gradients using the AD versus the adjoint method. It demonstrates that the adjoint method is always better for practical problems: it is both faster and requires far less memory on the device.

#!/usr/bin/env python3

"""
    A comparison of the efficiency of Pytorch AD versus the adjoint method
    for calculating the parameter gradient of a model.

    The demo runs `n_tests` tension tests using `n_steps` timesteps to
    integrate each test.  Given these two size parameter as input, the 
    code:
    
    1. Generates the input tensors describing the time/strain history of
       each tensile test
    2. Runs the same process twice, once using Pytorch AD to calculate the
       gradient and then again using the adjoint method

       a. Integrate the tests through time (forward pass)
       b. Calculate the norm of the resulting integrated stresses, which 
          is a reasonable surrogate for the actual loss functions used
          during training
       c. Calculate the gradient of that norm value with respect to the model
          parameters

    3. Compare the walltime and memory costs of running the analysis using
       the two methods.

    Running this script on a NVIDIA GeForce RTX 3070 Ti gives the following
    results for `n_tests = 500`:
    
    ======= ============= ============= ================== =================
    n_time  AD, time (s)  AD, mem (MB)  Adjoint, time (s)  Adjoint, mem (MB)
    ======= ============= ============= ================== =================
    100     9.266         274.4         7.057              18.42
    200     16.93         475.6         12.95              37.58
    300     24.43         674.8         18.92              55.21
    400     31.96         872.0         25.01              73.45    
    500     40.59         1062          31.97              93.76
    750     60.26         1527          46.13              137.8
    1000    77.55         2027          62.43              188.0
    ======= ============= ============= ================== =================

    Running for fixed `n_time = 100` gives:

    ======== ============= ============= ================== =================
    n_tests  AD, time (s)  AD, mem (MB)  Adjoint, time (s)  Adjoint, mem (MB)
    ======== ============= ============= ================== =================
    100      8.963         68.27         6.859              3.781
    200      9.193         131.9         6.993              7.512
    300      9.084         171.7         6.962              11.10
    400      9.257         234.6         6.947              14.83 
    500      9.175         274.4         7.004              18.42
    750      9.280         410.1         7.034              27.60
    1000     9.424         545.2         7.084              37.72
    5000     9.984         2686.7        7.613              187.3
    ======== ============= ============= ================== =================
"""

# Size of problem to run
n_tests = 500
n_time = 500

import sys

sys.path.append("../..")

import time

from pyoptmat import models, flowrules, temperature, experiments, hardening, optimize
from pyoptmat.temperature import ConstantParameter as CP

import torch

torch.set_default_tensor_type(torch.DoubleTensor)

if torch.cuda.is_available():
    dev = "cuda:0"
else:
    dev = "cpu"
device = torch.device(dev)


def model_maker(E, R, d, n, eta, s0, C, g, **kwargs):
    """
    Make the model
    """
    iso = hardening.VoceIsotropicHardeningModel(CP(R), CP(d))
    kin = hardening.ChabocheHardeningModel(CP(C), CP(g))
    flow = flowrules.IsoKinViscoplasticity(CP(n), CP(eta), CP(s0), iso, kin)
    mod = models.InelasticModel(CP(E), flow)
    return models.ModelIntegrator(mod, **kwargs).to(device)


if __name__ == "__main__":
    # Model parameters to use
    E = torch.tensor(150000.0, device=device)
    R = torch.tensor(200.0, device=device)
    d = torch.tensor(5.0, device=device)
    n = torch.tensor(7.0, device=device)
    eta = torch.tensor(300.0, device=device)
    s0 = torch.tensor(50.0, device=device)
    C = torch.tensor([10000.0, 5000.0], device=device)
    g = torch.tensor([100.0, 50.0], device=device)
    names = ["E", "R", "d", "n", "eta", "s0", "C", "g"]
    values = [E, R, d, n, eta, s0, C, g]

    # Setup the input data
    rates = torch.ones(n_tests, device=device) * 1.0e-5
    temps = torch.zeros(n_tests, device=device)
    elimits = torch.ones(n_tests, device=device) * 0.2
    times, strains, temps, cycles = experiments.make_tension_tests(
        rates, temps, elimits, n_time
    )

    data = torch.stack((times, temps, strains)).to(device)
    types = torch.tensor([experiments.exp_map["tensile"]] * n_tests, device=device)
    control = torch.tensor([experiments.control_map["strain"]] * n_tests, device=device)

    print("Running example with size %i x %i" % (n_tests, n_time))
    print("")

    # Do this twice (once for AD, once for adjoint)
    for use_adjoint in [False, True]:
        # Get the actual model
        maker = lambda *p: model_maker(*p, use_adjoint=use_adjoint)
        model = optimize.DeterministicModel(maker, names, values)

        # Start timing/monitoring
        tstart = time.time()
        torch.cuda.reset_peak_memory_stats(device)

        # Run the forward integration
        pred = model(data, cycles, types, control)

        # Calculate the loss surrogate
        loss = torch.norm(pred)

        # Calculate the gradient
        loss.backward()

        # Stop monitoring
        tend = time.time()
        mem = torch.cuda.max_memory_allocated() / 1024 / 1024

        # Print statistics
        if use_adjoint:
            print("Adjoint method:")
        else:
            print("Torch AD:")
        print("\tWalltime (s): %f" % (tend - tstart))
        print("\tMax memory use (MB): %f" % mem)
        print("")