Source code for openmc.deplete.r2s

from __future__ import annotations
from collections.abc import Sequence
import copy
from datetime import datetime
import json
from pathlib import Path

import numpy as np
import openmc
from . import IndependentOperator, PredictorIntegrator
from .microxs import get_microxs_and_flux, write_microxs_hdf5, read_microxs_hdf5
from .results import Results
from ..checkvalue import PathLike


def get_activation_materials(
    model: openmc.Model, mmv: openmc.MeshMaterialVolumes
) -> openmc.Materials:
    """Get a list of activation materials for each mesh element/material.

    When performing a mesh-based R2S calculation, a unique material is needed
    for each activation region, which is a combination of a mesh element and a
    material within that mesh element. This function generates a list of such
    materials, each with a unique name and volume corresponding to the mesh
    element and material.

    Parameters
    ----------
    model : openmc.Model
        The full model containing the geometry and materials.
    mmv : openmc.MeshMaterialVolumes
        The mesh material volumes object containing the materials and their
        volumes for each mesh element.

    Returns
    -------
    openmc.Materials
        A list of materials, each corresponding to a unique mesh element and
        material combination.

    """
    # Get the material ID, volume, and element index for each element-material
    # combination
    mat_ids = mmv._materials[mmv._materials > -1]
    volumes = mmv._volumes[mmv._materials > -1]
    elems, _ = np.where(mmv._materials > -1)

    # Get all materials in the model
    material_dict = model._get_all_materials()

    # Create a new activation material for each element-material combination
    materials = openmc.Materials()
    for elem, mat_id, vol in zip(elems, mat_ids, volumes):
        mat = material_dict[mat_id]
        new_mat = mat.clone()
        new_mat.depletable = True
        new_mat.name = f'Element {elem}, Material {mat_id}'
        new_mat.volume = vol
        materials.append(new_mat)

    return materials


[docs] class R2SManager: """Manager for Rigorous 2-Step (R2S) method calculations. This class is responsible for managing the materials and sources needed for mesh-based or cell-based R2S calculations. It provides methods to get activation materials and decay photon sources based on the mesh/cells and materials in the OpenMC model. This class supports the use of a different models for the neutron and photon transport calculation. However, for cell-based calculations, it assumes that the only changes in the model are material assignments. For mesh-based calculations, it checks material assignments in the photon model and any element--material combinations that don't appear in the photon model are skipped. Parameters ---------- neutron_model : openmc.Model The OpenMC model to use for neutron transport. domains : openmc.MeshBase or Sequence[openmc.Cell] The mesh or a sequence of cells that represent the spatial units over which the R2S calculation will be performed. photon_model : openmc.Model, optional The OpenMC model to use for photon transport calculations. If None, a shallow copy of the neutron_model will be created and used. Attributes ---------- domains : openmc.MeshBase or Sequence[openmc.Cell] The mesh or a sequence of cells that represent the spatial units over which the R2S calculation will be performed. neutron_model : openmc.Model The OpenMC model used for neutron transport. photon_model : openmc.Model The OpenMC model used for photon transport calculations. method : {'mesh-based', 'cell-based'} Indicates whether the R2S calculation uses mesh elements ('mesh-based') as the spatial discetization or a list of a cells ('cell-based'). results : dict A dictionary that stores results from the R2S calculation. """ def __init__( self, neutron_model: openmc.Model, domains: openmc.MeshBase | Sequence[openmc.Cell], photon_model: openmc.Model | None = None, ): self.neutron_model = neutron_model if photon_model is None: # Create a shallow copy of the neutron model for photon transport self.photon_model = openmc.Model( geometry=copy.copy(neutron_model.geometry), materials=copy.copy(neutron_model.materials), settings=copy.copy(neutron_model.settings), tallies=copy.copy(neutron_model.tallies), plots=copy.copy(neutron_model.plots), ) else: self.photon_model = photon_model if isinstance(domains, openmc.MeshBase): self.method = 'mesh-based' else: self.method = 'cell-based' self.domains = domains self.results = {}
[docs] def run( self, timesteps: Sequence[float] | Sequence[tuple[float, str]], source_rates: float | Sequence[float], timestep_units: str = 's', photon_time_indices: Sequence[int] | None = None, output_dir: PathLike | None = None, bounding_boxes: dict[int, openmc.BoundingBox] | None = None, chain_file: PathLike | None = None, micro_kwargs: dict | None = None, mat_vol_kwargs: dict | None = None, run_kwargs: dict | None = None, operator_kwargs: dict | None = None, ): """Run the R2S calculation. Parameters ---------- timesteps : Sequence[float] or Sequence[tuple[float, str]] Sequence of timesteps. Note that values are not cumulative. The units are specified by the `timestep_units` argument when `timesteps` is an iterable of float. Alternatively, units can be specified for each step by passing an iterable of (value, unit) tuples. source_rates : float or Sequence[float] Source rate in [neutron/sec] for each interval in `timesteps`. timestep_units : {'s', 'min', 'h', 'd', 'a'}, optional Units for values specified in the `timesteps` argument when passing float values. 's' means seconds, 'min' means minutes, 'h' means hours, 'd' means days, and 'a' means years (Julian). photon_time_indices : Sequence[int], optional Sequence of time indices at which photon transport should be run; represented as indices into the array of times formed by the timesteps. For example, if two timesteps are specified, the array of times would contain three entries, and [2] would indicate computing photon results at the last time. A value of None indicates to run photon transport for each time. output_dir : PathLike, optional Path to directory where R2S calculation outputs will be saved. If not provided, a timestamped directory 'r2s_YYYY-MM-DDTHH-MM-SS' is created. Subdirectories will be created for the neutron transport, activation, and photon transport steps. bounding_boxes : dict[int, openmc.BoundingBox], optional Dictionary mapping cell IDs to bounding boxes used for spatial source sampling in cell-based R2S calculations. Required if method is 'cell-based'. chain_file : PathLike, optional Path to the depletion chain XML file to use during activation. If not provided, the default configured chain file will be used. micro_kwargs : dict, optional Additional keyword arguments passed to :func:`openmc.deplete.get_microxs_and_flux` during the neutron transport step. mat_vol_kwargs : dict, optional Additional keyword arguments passed to :meth:`openmc.MeshBase.material_volumes`. run_kwargs : dict, optional Additional keyword arguments passed to :meth:`openmc.Model.run` during the neutron and photon transport step. By default, output is disabled. operator_kwargs : dict, optional Additional keyword arguments passed to :class:`openmc.deplete.IndependentOperator`. Returns ------- Path Path to the output directory containing all calculation results """ if output_dir is None: stamp = datetime.now().strftime('%Y-%m-%dT%H-%M-%S') output_dir = Path(f'r2s_{stamp}') # Set run_kwargs for the neutron transport step if micro_kwargs is None: micro_kwargs = {} if run_kwargs is None: run_kwargs = {} if operator_kwargs is None: operator_kwargs = {} run_kwargs.setdefault('output', False) micro_kwargs.setdefault('run_kwargs', run_kwargs) # If a chain file is provided, prefer it for steps 1 and 2 if chain_file is not None: micro_kwargs.setdefault('chain_file', chain_file) operator_kwargs.setdefault('chain_file', chain_file) self.step1_neutron_transport( output_dir / 'neutron_transport', mat_vol_kwargs, micro_kwargs ) self.step2_activation( timesteps, source_rates, timestep_units, output_dir / 'activation', operator_kwargs=operator_kwargs ) self.step3_photon_transport( photon_time_indices, bounding_boxes, output_dir / 'photon_transport', mat_vol_kwargs=mat_vol_kwargs, run_kwargs=run_kwargs ) return output_dir
[docs] def step1_neutron_transport( self, output_dir: PathLike = "neutron_transport", mat_vol_kwargs: dict | None = None, micro_kwargs: dict | None = None ): """Run the neutron transport step. This step computes the material volume fractions on the mesh, creates a mesh-material filter, and retrieves the fluxes and microscopic cross sections for each mesh/material combination. This step will populate the 'fluxes' and 'micros' keys in the results dictionary. For a mesh-based calculation, it will also populate the 'mesh_material_volumes' key. Parameters ---------- output_dir : PathLike, optional The directory where the results will be saved. mat_vol_kwargs : dict, optional Additional keyword arguments based to :meth:`openmc.MeshBase.material_volumes`. micro_kwargs : dict, optional Additional keyword arguments passed to :func:`openmc.deplete.get_microxs_and_flux`. """ output_dir = Path(output_dir) output_dir.mkdir(parents=True, exist_ok=True) if self.method == 'mesh-based': # Compute material volume fractions on the mesh if mat_vol_kwargs is None: mat_vol_kwargs = {} self.results['mesh_material_volumes'] = mmv = \ self.domains.material_volumes(self.neutron_model, **mat_vol_kwargs) # Save results to file mmv.save(output_dir / 'mesh_material_volumes.npz') # Create mesh-material filter based on what combos were found domains = openmc.MeshMaterialFilter.from_volumes(self.domains, mmv) else: domains: Sequence[openmc.Cell] = self.domains # Check to make sure that each cell is filled with a material and # that the volume has been set # TODO: If volumes are not set, run volume calculation for cells for cell in domains: if cell.fill is None: raise ValueError( f"Cell {cell.id} is not filled with a materials. " "Please set the fill material for each cell before " "running the R2S calculation." ) if cell.volume is None: raise ValueError( f"Cell {cell.id} does not have a volume set. " "Please set the volume for each cell before running " "the R2S calculation." ) # Set default keyword arguments for microxs and flux calculation if micro_kwargs is None: micro_kwargs = {} micro_kwargs.setdefault('path_statepoint', output_dir / 'statepoint.h5') micro_kwargs.setdefault('path_input', output_dir / 'model.xml') # Run neutron transport and get fluxes and micros self.results['fluxes'], self.results['micros'] = get_microxs_and_flux( self.neutron_model, domains, **micro_kwargs) # Save flux and micros to file np.save(output_dir / 'fluxes.npy', self.results['fluxes']) write_microxs_hdf5(self.results['micros'], output_dir / 'micros.h5')
[docs] def step2_activation( self, timesteps: Sequence[float] | Sequence[tuple[float, str]], source_rates: float | Sequence[float], timestep_units: str = 's', output_dir: PathLike = 'activation', operator_kwargs: dict | None = None, ): """Run the activation step. This step creates a unique copy of each activation material based on the mesh elements or cells, then solves the depletion equations for each material using the fluxes and microscopic cross sections obtained in the neutron transport step. This step will populate the 'depletion_results' and 'activation_materials' keys in the results dictionary. Parameters ---------- timesteps : Sequence[float] or Sequence[tuple[float, str]] Sequence of timesteps. Note that values are not cumulative. The units are specified by the `timestep_units` argument when `timesteps` is an iterable of float. Alternatively, units can be specified for each step by passing an iterable of (value, unit) tuples. source_rates : float | Sequence[float] Source rate in [neutron/sec] for each interval in `timesteps`. timestep_units : {'s', 'min', 'h', 'd', 'a'}, optional Units for values specified in the `timesteps` argument when passing float values. 's' means seconds, 'min' means minutes, 'h' means hours, 'd' means days, and 'a' means years (Julian). output_dir : PathLike, optional Path to directory where activation calculation outputs will be saved. operator_kwargs : dict, optional Additional keyword arguments passed to :class:`openmc.deplete.IndependentOperator`. """ if self.method == 'mesh-based': # Get unique material for each (mesh, material) combination mmv = self.results['mesh_material_volumes'] self.results['activation_materials'] = get_activation_materials(self.neutron_model, mmv) else: # Create unique material for each cell activation_mats = openmc.Materials() for cell in self.domains: mat = cell.fill.clone() mat.name = f'Cell {cell.id}' mat.depletable = True mat.volume = cell.volume activation_mats.append(mat) self.results['activation_materials'] = activation_mats # Save activation materials to file output_dir = Path(output_dir) output_dir.mkdir(parents=True, exist_ok=True) self.results['activation_materials'].export_to_xml( output_dir / 'materials.xml') # Create depletion operator for the activation materials if operator_kwargs is None: operator_kwargs = {} operator_kwargs.setdefault('normalization_mode', 'source-rate') op = IndependentOperator( self.results['activation_materials'], self.results['fluxes'], self.results['micros'], **operator_kwargs ) # Create time integrator and solve depletion equations integrator = PredictorIntegrator( op, timesteps, source_rates=source_rates, timestep_units=timestep_units ) output_path = output_dir / 'depletion_results.h5' integrator.integrate(final_step=False, path=output_path) # Get depletion results self.results['depletion_results'] = Results(output_path)
[docs] def step3_photon_transport( self, time_indices: Sequence[int] | None = None, bounding_boxes: dict[int, openmc.BoundingBox] | None = None, output_dir: PathLike = 'photon_transport', mat_vol_kwargs: dict | None = None, run_kwargs: dict | None = None, ): """Run the photon transport step. This step performs photon transport calculations using decay photon sources created from the activated materials. For each specified time, it creates appropriate photon sources and runs a transport calculation. In mesh-based mode, the sources are created using the mesh material volumes, while in cell-based mode, they are created using bounding boxes for each cell. This step will populate the 'photon_tallies' key in the results dictionary. Parameters ---------- time_indices : Sequence[int], optional Sequence of time indices at which photon transport should be run; represented as indices into the array of times formed by the timesteps. For example, if two timesteps are specified, the array of times would contain three entries, and [2] would indicate computing photon results at the last time. A value of None indicates to run photon transport for each time. bounding_boxes : dict[int, openmc.BoundingBox], optional Dictionary mapping cell IDs to bounding boxes used for spatial source sampling in cell-based R2S calculations. Required if method is 'cell-based'. output_dir : PathLike, optional Path to directory where photon transport outputs will be saved. mat_vol_kwargs : dict, optional Additional keyword arguments passed to :meth:`openmc.MeshBase.material_volumes`. run_kwargs : dict, optional Additional keyword arguments passed to :meth:`openmc.Model.run` during the photon transport step. By default, output is disabled. """ # TODO: Automatically determine bounding box for each cell if bounding_boxes is None and self.method == 'cell-based': raise ValueError("bounding_boxes must be provided for cell-based " "R2S calculations.") # Set default run arguments if not provided if run_kwargs is None: run_kwargs = {} run_kwargs.setdefault('output', False) # Write out JSON file with tally IDs that can be used for loading # results output_dir = Path(output_dir) output_dir.mkdir(parents=True, exist_ok=True) # Get default time indices if not provided if time_indices is None: n_steps = len(self.results['depletion_results']) time_indices = list(range(n_steps)) # Check whether the photon model is different neutron_univ = self.neutron_model.geometry.root_universe photon_univ = self.photon_model.geometry.root_universe different_photon_model = (neutron_univ != photon_univ) # For mesh-based calculations, compute material volume fractions for the # photon model if it is different from the neutron model to account for # potential material changes if self.method == 'mesh-based' and different_photon_model: self.results['mesh_material_volumes_photon'] = photon_mmv = \ self.domains.material_volumes(self.photon_model, **mat_vol_kwargs) # Save photon MMV results to file photon_mmv.save(output_dir / 'mesh_material_volumes.npz') tally_ids = [tally.id for tally in self.photon_model.tallies] with open(output_dir / 'tally_ids.json', 'w') as f: json.dump(tally_ids, f) self.results['photon_tallies'] = {} # Get dictionary of cells in the photon model if different_photon_model: photon_cells = self.photon_model.geometry.get_all_cells() for time_index in time_indices: # Create decay photon source if self.method == 'mesh-based': self.photon_model.settings.source = \ self.get_decay_photon_source_mesh(time_index) else: sources = [] results = self.results['depletion_results'] for cell, original_mat in zip(self.domains, self.results['activation_materials']): # Skip if the cell is not in the photon model or the # material has changed if different_photon_model: if cell.id not in photon_cells or \ cell.fill.id != photon_cells[cell.id].fill.id: continue # Get bounding box for the cell bounding_box = bounding_boxes[cell.id] # Get activated material composition activated_mat = results[time_index].get_material(str(original_mat.id)) # Create decay photon source source space = openmc.stats.Box(*bounding_box) energy = activated_mat.get_decay_photon_energy() strength = energy.integral() if energy is not None else 0.0 source = openmc.IndependentSource( space=space, energy=energy, particle='photon', strength=strength, constraints={'domains': [cell]} ) sources.append(source) self.photon_model.settings.source = sources # Convert time_index (which may be negative) to a normal index if time_index < 0: time_index = len(self.results['depletion_results']) + time_index # Run photon transport calculation run_kwargs['cwd'] = Path(output_dir) / f'time_{time_index}' statepoint_path = self.photon_model.run(**run_kwargs) # Store tally results with openmc.StatePoint(statepoint_path) as sp: self.results['photon_tallies'][time_index] = [ sp.tallies[tally.id] for tally in self.photon_model.tallies ]
[docs] def get_decay_photon_source_mesh( self, time_index: int = -1 ) -> list[openmc.MeshSource]: """Create decay photon source for a mesh-based calculation. This function creates N :class:`MeshSource` objects where N is the maximum number of unique materials that appears in a single mesh element. For each mesh element-material combination, and IndependentSource instance is created with a spatial constraint limited the sampled decay photons to the correct region. When the photon transport model is different from the neutron model, the photon MeshMaterialVolumes is used to determine whether an (element, material) combination exists in the photon model. Parameters ---------- time_index : int, optional Time index for the decay photon source. Default is -1 (last time). Returns ------- list of openmc.MeshSource A list of MeshSource objects, each containing IndependentSource instances for the decay photons in the corresponding mesh element. """ mat_dict = self.neutron_model._get_all_materials() # Some MeshSource objects will have empty positions; create a "null source" # that is used for this case null_source = openmc.IndependentSource(particle='photon', strength=0.0) # List to hold sources for each MeshSource (length = N) source_lists = [] # Index in the overall list of activated materials index_mat = 0 # Get various results from previous steps mat_vols = self.results['mesh_material_volumes'] materials = self.results['activation_materials'] results = self.results['depletion_results'] photon_mat_vols = self.results.get('mesh_material_volumes_photon') # Total number of mesh elements n_elements = mat_vols.num_elements for index_elem in range(n_elements): # Determine which materials exist in the photon model for this element if photon_mat_vols is not None: photon_materials = { mat_id for mat_id, _ in photon_mat_vols.by_element(index_elem) if mat_id is not None } for j, (mat_id, _) in enumerate(mat_vols.by_element(index_elem)): # Skip void volume if mat_id is None: continue # Skip if this material doesn't exist in photon model if photon_mat_vols is not None and mat_id not in photon_materials: index_mat += 1 continue # Check whether a new MeshSource object is needed if j >= len(source_lists): source_lists.append([null_source]*n_elements) # Get activated material composition original_mat = materials[index_mat] activated_mat = results[time_index].get_material(str(original_mat.id)) # Create decay photon source source energy = activated_mat.get_decay_photon_energy() if energy is not None: strength = energy.integral() source_lists[j][index_elem] = openmc.IndependentSource( energy=energy, particle='photon', strength=strength, constraints={'domains': [mat_dict[mat_id]]} ) # Increment index of activated material index_mat += 1 # Return list of mesh sources return [openmc.MeshSource(self.domains, sources) for sources in source_lists]
[docs] def load_results(self, path: PathLike): """Load results from a previous R2S calculation. Parameters ---------- path : PathLike Path to the directory containing the R2S calculation results. """ path = Path(path) # Load neutron transport results neutron_dir = path / 'neutron_transport' if self.method == 'mesh-based': mmv_file = neutron_dir / 'mesh_material_volumes.npz' if mmv_file.exists(): self.results['mesh_material_volumes'] = \ openmc.MeshMaterialVolumes.from_npz(mmv_file) fluxes_file = neutron_dir / 'fluxes.npy' if fluxes_file.exists(): self.results['fluxes'] = list(np.load(fluxes_file, allow_pickle=True)) micros_dict = read_microxs_hdf5(neutron_dir / 'micros.h5') self.results['micros'] = [ micros_dict[f'domain_{i}'] for i in range(len(micros_dict)) ] # Load activation results activation_dir = path / 'activation' activation_results = activation_dir / 'depletion_results.h5' if activation_results.exists(): self.results['depletion_results'] = Results(activation_results) activation_mats_file = activation_dir / 'materials.xml' if activation_mats_file.exists(): self.results['activation_materials'] = \ openmc.Materials.from_xml(activation_mats_file) # Load photon transport results photon_dir = path / 'photon_transport' # Load photon mesh material volumes if they exist (for mesh-based calculations) if self.method == 'mesh-based': photon_mmv_file = photon_dir / 'mesh_material_volumes.npz' if photon_mmv_file.exists(): self.results['mesh_material_volumes_photon'] = \ openmc.MeshMaterialVolumes.from_npz(photon_mmv_file) # Load tally IDs from JSON file tally_ids_path = photon_dir / 'tally_ids.json' if tally_ids_path.exists(): with tally_ids_path.open('r') as f: tally_ids = json.load(f) self.results['photon_tallies'] = {} # For each photon transport calc, load the statepoint and get the # tally results based on tally_ids for time_dir in photon_dir.glob('time_*'): time_index = int(time_dir.name.split('_')[1]) for sp_path in time_dir.glob('statepoint.*.h5'): with openmc.StatePoint(sp_path) as sp: self.results['photon_tallies'][time_index] = [ sp.tallies[tally_id] for tally_id in tally_ids ]