Source code for openmc.deplete.pool

"""Dedicated module containing depletion function

Provided to avoid some circular imports
"""
from itertools import repeat, starmap
from multiprocessing import Pool

from scipy.sparse import bmat, hstack, vstack, csc_matrix
import numpy as np

from openmc.mpi import comm

# Configurable switch that enables / disables the use of
# multiprocessing routines during depletion
USE_MULTIPROCESSING = True

# Allow user to override the number of worker processes to use for depletion
# calculations
NUM_PROCESSES = None

def _distribute(items):
    """Distribute items across MPI communicator

    Parameters
    ----------
    items : list
        List of items of distribute

    Returns
    -------
    list
        Items assigned to process that called

    """
    min_size, extra = divmod(len(items), comm.size)
    j = 0
    for i in range(comm.size):
        chunk_size = min_size + int(i < extra)
        if comm.rank == i:
            return items[j:j + chunk_size]
        j += chunk_size

[docs] def deplete(func, chain, n, rates, dt, current_timestep=None, matrix_func=None, transfer_rates=None, external_source_rates=None, *matrix_args): """Deplete materials using given reaction rates for a specified time Parameters ---------- func : callable Function to use to get new compositions. Expected to have the signature ``func(A, n0, t) -> n1`` chain : openmc.deplete.Chain Depletion chain n : list of numpy.ndarray List of atom number arrays for each material. Each array in the list contains the number of [atom] of each nuclide. rates : openmc.deplete.ReactionRates Reaction rates (from transport operator) dt : float Time in [s] to deplete for current_timestep : int Current timestep index maxtrix_func : callable, optional Function to form the depletion matrix after calling ``matrix_func(chain, rates, fission_yields)``, where ``fission_yields = {parent: {product: yield_frac}}`` Expected to return the depletion matrix required by ``func`` transfer_rates : openmc.deplete.TransferRates, Optional Transfer rates for continuous removal/feed. .. versionadded:: 0.14.0 external_source_rates : openmc.deplete.ExternalSourceRates, Optional External source rates for continuous removal/feed. .. versionadded:: 0.15.3 matrix_args: Any, optional Additional arguments passed to matrix_func Returns ------- n_result : list of numpy.ndarray Updated list of atom number arrays for each material. Each array in the list contains the number of [atom] of each nuclide. """ fission_yields = chain.fission_yields if len(fission_yields) == 1: fission_yields = repeat(fission_yields[0]) elif len(fission_yields) != len(n): raise ValueError( "Number of material fission yield distributions {} is not " "equal to the number of compositions {}".format( len(fission_yields), len(n))) if matrix_func is None: matrices = map(chain.form_matrix, rates, fission_yields) else: matrices = map(matrix_func, repeat(chain), rates, fission_yields, *matrix_args) if (transfer_rates is not None and current_timestep in transfer_rates.external_timesteps): # Calculate transfer rate terms as diagonal matrices transfers = map(chain.form_rr_term, repeat(transfer_rates), repeat(current_timestep), transfer_rates.local_mats) # Subtract transfer rate terms from Bateman matrices matrices = [matrix - transfer for (matrix, transfer) in zip(matrices, transfers)] if current_timestep in transfer_rates.index_transfer: # Gather all on comm.rank 0 matrices = comm.gather(matrices) n = comm.gather(n) if comm.rank == 0: # Expand lists matrices = [elm for matrix in matrices for elm in matrix] n = [n_elm for n_mat in n for n_elm in n_mat] # Calculate transfer rate terms as diagonal matrices transfer_pair = {} for mat_pair in transfer_rates.index_transfer[current_timestep]: transfer_matrix = chain.form_rr_term(transfer_rates, current_timestep, mat_pair) transfer_pair[mat_pair] = transfer_matrix # Combine all matrices together in a single matrix of matrices # to be solved in one go n_rows = n_cols = len(transfer_rates.burnable_mats) rows = [] for row in range(n_rows): cols = [] for col in range(n_cols): mat_pair = (transfer_rates.burnable_mats[row], transfer_rates.burnable_mats[col]) if row == col: # Fill the diagonals with the Bateman matrices cols.append(matrices[row]) elif mat_pair in transfer_rates.index_transfer[current_timestep]: # Fill the off-diagonals with the transfer pair matrices cols.append(transfer_pair[mat_pair]) else: cols.append(None) rows.append(cols) matrix = bmat(rows) # Concatenate vectors of nuclides in one n_multi = np.concatenate(n) n_result = func(matrix, n_multi, dt) # Split back the nuclide vector result into the original form n_result = np.split(n_result, np.cumsum([len(i) for i in n])[:-1]) else: n_result = None # Braodcast result to other ranks n_result = comm.bcast(n_result) # Distribute results across MPI n_result = _distribute(n_result) return n_result if (external_source_rates is not None and current_timestep in external_source_rates.external_timesteps): # Calculate external source term vectors sources = map(chain.form_ext_source_term, repeat(external_source_rates), repeat(current_timestep), external_source_rates.local_mats) # stack vector column at the end of the matrix matrices = [ hstack([matrix, source]) for matrix, source in zip(matrices, sources) ] # Add a last row of zeroes to the matrices and append 1 to the last row # of the nuclide vectors for i, matrix in enumerate(matrices): if not np.equal(*matrix.shape): matrices[i] = vstack([matrix, csc_matrix([0]*matrix.shape[1])]) n[i] = np.append(n[i], 1.0) inputs = zip(matrices, n, repeat(dt)) if USE_MULTIPROCESSING: with Pool(NUM_PROCESSES) as pool: n_result = list(pool.starmap(func, inputs)) else: n_result = list(starmap(func, inputs)) # Remove extra value at the end of the nuclide vectors if (external_source_rates is not None and current_timestep in external_source_rates.external_timesteps): external_source_rates.reformat_nuclide_vectors(n) external_source_rates.reformat_nuclide_vectors(n_result) return n_result