Source code for wwinpy.parser

"""
WWINP file parser module.

This module provides functionality for reading and parsing MCNP weight window input (WWINP) files.
Contains optimized parsing functions for handling large data files efficiently.
"""

from typing import Iterator
import itertools
from wwinpy._utils import verify_and_correct
from wwinpy._exceptions import WWINPFormatError
from wwinpy.header import Header
from wwinpy.ww_data import WWData
from wwinpy.mesh import Mesh
from wwinpy.geometry import GeometryData, GeometryAxis
from wwinpy.weight_windows import WeightWindowValues
import numpy as np

def _tokenize_file(file_path: str) -> Iterator[str]:
    """Generate tokens from a WWINP file line by line.

    :param file_path: Path to the WWINP file
    :type file_path: str
    :return: Iterator over tokens in the file
    :rtype: Iterator[str]
    :raises WWINPFormatError: If an empty line is encountered
    """
    with open(file_path, "r") as f:
        for line_number, line in enumerate(f, start=1):
            line = line.strip()
            if not line:
                raise WWINPFormatError(f"Empty line detected at line {line_number}.")
            for token in line.split():
                yield token

[docs] def from_file(file_path: str, verbose: bool = False) -> WWData: """Parse a WWINP file and create a WWData object. :param file_path: Path to the WWINP file to parse :type file_path: str :param verbose: If True, print detailed parsing information :type verbose: bool :return: Parsed WWINP data object :rtype: WWData :raises WWINPFormatError: If the file format is invalid :raises WWINPParsingError: If there are errors during parsing """ token_gen = _tokenize_file(file_path) # --------------------------- # Block 1: Parse Header # --------------------------- if verbose: print(f"Reading file: {file_path}") try: # Read first 4 tokens as integers header_tokens = list(itertools.islice(token_gen, 4)) if len(header_tokens) < 4: raise WWINPFormatError("File ended unexpectedly while reading the header.") if_, iv, ni, nr = map(int, header_tokens) if verbose: print(f"Header values: if={if_}, iv={iv}, ni={ni}, nr={nr}") # Attempt to read probid try: next_token = next(token_gen) if next_token.isdigit(): probid = "" token_gen = itertools.chain([next_token], token_gen) else: probid = next_token except StopIteration: probid = "" if verbose: print(f"probid='{probid}'") except StopIteration: raise WWINPFormatError("File ended unexpectedly while reading the header.") except ValueError: raise WWINPFormatError("Header contains non-integer values.") header = Header(if_=if_, iv=iv, ni=ni, nr=nr, probid=probid) # Parse nt if iv=2 if iv == 2: if verbose: print("Parsing nt array (iv=2)...") try: header.nt = list(map(int, itertools.islice(token_gen, ni))) if len(header.nt) < ni: raise WWINPFormatError("File ended while reading nt array.") if verbose: for i, val in enumerate(header.nt): print(f" nt[{i}] = {val}") except StopIteration: raise WWINPFormatError("File ended while reading nt array.") # Parse ne array if verbose: print("Parsing ne array...") try: header.ne = list(map(int, itertools.islice(token_gen, ni))) if len(header.ne) < ni: raise WWINPFormatError("File ended while reading ne array.") if verbose: for i, val in enumerate(header.ne): print(f" ne[{i}] = {val}") except StopIteration: raise WWINPFormatError("File ended while reading ne array.") # --------------------------- # Verification and Correction # --------------------------- if verbose: print("\n=== Verifying and Correcting Data ===") updated_ni, updated_nt, updated_ne = verify_and_correct( ni=header.ni, nt=header.nt if iv == 2 else None, ne=header.ne, iv=iv, verbose=verbose ) header.ni = updated_ni if iv == 2: header.nt = updated_nt header.ne = updated_ne if verbose: print(f" Updated ni: {header.ni}") for i in range(header.ni): if iv == 2: print(f" Updated nt[{i}]: {header.nt[i]}") print(f" Updated ne[{i}]: {header.ne[i]}") # Parse geometry parameters if verbose: print("\nParsing geometry parameters...") try: geom_tokens = list(itertools.islice(token_gen, 6)) if len(geom_tokens) < 6: raise WWINPFormatError("Not enough tokens for geometry parameters.") header.nfx, header.nfy, header.nfz, header.x0, header.y0, header.z0 = map(float, geom_tokens) if verbose: print(f" nfx={header.nfx}, nfy={header.nfy}, nfz={header.nfz}") print(f" x0={header.x0}, y0={header.y0}, z0={header.z0}") except StopIteration: raise WWINPFormatError("File ended while reading geometry parameters.") # Parse nr-dependent values if verbose: print(f"Parsing nr={nr} specific values...") try: if nr == 10: nr_tokens = list(itertools.islice(token_gen, 4)) if len(nr_tokens) < 4: raise WWINPFormatError("Not enough tokens for [nr=10] line.") header.ncx, header.ncy, header.ncz, header.nwg = map(float, nr_tokens) if verbose: print(f" ncx={header.ncx}, ncy={header.ncy}, ncz={header.ncz}, nwg={header.nwg}") elif nr == 16: nr_tokens1 = list(itertools.islice(token_gen, 6)) nr_tokens2 = list(itertools.islice(token_gen, 4)) if len(nr_tokens1) < 6 or len(nr_tokens2) < 4: raise WWINPFormatError("Not enough tokens for [nr=16] lines.") header.ncx, header.ncy, header.ncz, header.x1, header.y1, header.z1 = map(float, nr_tokens1) header.x2, header.y2, header.z2, header.nwg = map(float, nr_tokens2) if verbose: print(f" ncx={header.ncx}, ncy={header.ncy}, ncz={header.ncz}") print(f" x1={header.x1}, y1={header.y1}, z1={header.z1}") print(f" x2={header.x2}, y2={header.y2}, z2={header.z2}, nwg={header.nwg}") else: raise WWINPFormatError(f"Unsupported nr value: {nr}") except StopIteration: raise WWINPFormatError("File ended while reading nr-dependent values.") # --------------------------- # Block 2: Geometry # --------------------------- if verbose: print("\n=== Parsing Geometry Block ===") ncx, ncy, ncz = int(header.ncx), int(header.ncy), int(header.ncz) if verbose: print(f"Mesh dimensions: ncx={ncx}, ncy={ncy}, ncz={ncz}") # Function to parse axis data using GeometryAxis with NumPy arrays def parse_axis(axis_name: str, n_segments: int, verbose: bool) -> GeometryAxis: axis = GeometryAxis(origin=0.0) # Initialize with default origin try: origin = float(next(token_gen)) axis.origin = origin if verbose: print(f"{axis_name}-axis origin: {origin}") except StopIteration: raise WWINPFormatError(f"File ended while reading {axis_name}-axis origin.") for i in range(n_segments): try: q, p, s = map(float, itertools.islice(token_gen, 3)) axis.add_segment(q, p, s) if verbose: print(f" {axis_name}-segment[{i}]: q={q}, p={p}, s={s}") except StopIteration: raise WWINPFormatError(f"File ended while reading {axis_name}_segments.") return axis x_axis = parse_axis("X", ncx, verbose) y_axis = parse_axis("Y", ncy, verbose) z_axis = parse_axis("Z", ncz, verbose) geometry = GeometryData( header=header, x_axis=x_axis, y_axis=y_axis, z_axis=z_axis ) # --------------------------- # Block 3: Values (Times, Energies, WW-Values) # --------------------------- if verbose: print("\n=== Parsing Values Block ===") # 1) Read time bins (if iv=2) if verbose: print("Reading time bins...") if iv == 2: time_bins_all = [] for i in range(header.ni): try: t_bins = np.fromiter( (float(token) for token in itertools.islice(token_gen, header.nt[i])), dtype=np.float64, count=header.nt[i] ) if t_bins.size < header.nt[i]: raise WWINPFormatError(f"File ended while reading time bins for particle {i}.") time_bins_all.append(t_bins) if verbose: for j, val in enumerate(t_bins): print(f" t[{i}][{j}] = {val}") except StopIteration: raise WWINPFormatError(f"File ended while reading time bins for particle {i}.") else: # No time dependency; assign a default single-element array for each particle default_time = 0.0 time_bins_all = [np.array([default_time]) for _ in range(header.ni)] if verbose: for i in range(header.ni): print(f" t[{i}] = []") # 2) Read energy bins if verbose: print("Reading energy bins...") energy_bins_all = [] for i in range(header.ni): try: e_bins = np.fromiter( (float(token) for token in itertools.islice(token_gen, header.ne[i])), dtype=np.float64, count=header.ne[i] ) if e_bins.size < header.ne[i]: raise WWINPFormatError(f"File ended while reading energy bins for particle {i}.") energy_bins_all.append(e_bins) if verbose: for j, val in enumerate(e_bins): print(f" e[{i}][{j}] = {val}") except StopIteration: raise WWINPFormatError(f"File ended while reading energy bins for particle {i}.") # 3) Read ww-values with enhanced verbosity if verbose: print("Reading ww-values...") # Precompute number of geometry cells num_geom_cells = int(header.nfx * header.nfy * header.nfz) # Assuming ncx, ncy, ncz are integers if verbose: print(f"Total geometry cells: {num_geom_cells}") # Initialize counters for verbose logging total_expected_ww_values = 0 for i in range(header.ni): time_max_range = header.nt[i] if iv == 2 else 1 total_expected_ww_values += time_max_range * header.ne[i] * num_geom_cells print(f"Total expected ww-values: {total_expected_ww_values}") # Create dictionaries for time and energy bins time_mesh_dict = {} energy_mesh_dict = {} ww_values_dict = {} for i in range(header.ni): time_mesh_dict[i] = time_bins_all[i] energy_mesh_dict[i] = energy_bins_all[i] try: for i in range(header.ni): time_bins = time_bins_all[i] energy_bins = energy_bins_all[i] ww_all = np.empty((len(time_bins), len(energy_bins), num_geom_cells)) for t in range(len(time_bins)): for e in range(len(energy_bins)): ww_all[t, e, :] = np.fromiter( (float(token) for token in itertools.islice(token_gen, num_geom_cells)), dtype=np.float64, count=num_geom_cells ) ww_values_dict[i] = ww_all except StopIteration: raise WWINPFormatError("File ended unexpectedly while reading ww-values.") if verbose: total_ww_values_read = sum(p.size for p in ww_values_dict.values()) print(f"Total ww-values read: {total_ww_values_read}") if total_ww_values_read < total_expected_ww_values: print(f"Warning: Expected {total_expected_ww_values} ww-values, but only {total_ww_values_read} were read.") # Create the mesh with time and energy data mesh = Mesh( header=header, geometry=geometry, time_mesh=time_mesh_dict, energy_mesh=energy_mesh_dict, ) values = WeightWindowValues( header=header, mesh=mesh, ww_values=ww_values_dict ) # Finally construct the WWData wwinp_data = WWData( header=header, mesh=mesh, values=values ) return wwinp_data