Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion lib/python/picongpu/extra/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
from .memory_calculator import MemoryCalculator
from .field_ionization import FieldIonization
from . import FLYonPICRateCalculationReference
from .picmi_density_reader import CppFctReader

__all__ = ["FindTime", "MemoryCalculator", "FieldIonization", "FLYonPICRateCalculationReference"]
__all__ = ["FindTime", "MemoryCalculator", "FieldIonization", "FLYonPICRateCalculationReference", "CppFctReader"]
144 changes: 144 additions & 0 deletions lib/python/picongpu/extra/utils/picmi_density_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import numpy as np
import matplotlib.pyplot as plt
import sympy
import json


class CppFctReader:
"""class that alllows to evalute the PIConGPU free density
from a c++ code string

BE AWARE: the code assumes that:
1.) a free formular density profile was used in PICMI
2.) if sympy.Piecewise was used, that the c++ code first case, will not branch
"""

def __init__(self, s, debug=False):
"""constructor
s ... string from pypicongpu.json file
debug ... bool for debug output
"""
self.debug = debug
self.cpp_code = s.replace("\n", " ")
self.cpp_code = self.initial_clean(self.cpp_code)

def evaluate(self, x, symbol="y"):
"""evalute expression
x ... float: position [m] where to evaluate the density
symbol ... string: symbol to replace (default: 'y')
"""
return self.inner_evaluate(self.cpp_code, x, symbol=symbol) + 0.0 # add float for cast

def test_for_cases(self, s):
"""internal method checking for c++ cases
s ... string to check
"""
index_first_q = s.find("?")
index_first_c = s.find(":")
if index_first_q == -1 and index_first_c == -1:
return False
else:
return True

def inner_evaluate(self, s, x, symbol):
"""evalute string - this is a recursevly called internal method
s ... string code
x ... float value to evalute density at
symbol ... string symbol to replace
"""
if self.test_for_cases(s):
if self.debug:
print("CASES")
return self.eval_cases(s, x, symbol)

else:
s_sym = sympy.parsing.sympy_parser.parse_expr(s)
res = s_sym.subs(symbol, x)
if self.debug:
print("eval:", res)
return res

def eval_cases(self, s, x, symbol):
"""handle c++ cases of form condition ? case1 : case 2
s ... string code
x ... float value to evalute
symbol ... symbol to replace in string s
"""
if self.debug:
print("-->", s)
s = self.clean_substring(s)
if self.debug:
print("==>", s)
index_first_q = s.find("?")
index_first_c = s.find(":") # this assumes that there are no cases in the first branch

condition = s[0:index_first_q]
case1 = s[index_first_q + 1 : index_first_c]
case2 = s[index_first_c + 1 :]
Comment on lines +72 to +77
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Algorithm for detecting matching parenthesis (adapted for this problem):

from itertools import groupby
from sympy import Piecewise


def nesting_level(c):
    if c == "?":
        return 1
    if c == ":":
        return -1
    return 0


class Counter:
    counter = 0

    def __add__(self, addition):
        self.counter += addition
        return self.counter


def parse_branchless(string):
    # ... do parsing with guaranteed no ternary operators...
    return string


def parse_with_keys(keys_and_expressions):
    if len(keys_and_expressions) == 1:
        return parse_branchless(keys_and_expressions[0][1].strip("?:() "))

    condition = parse_with_keys(keys_and_expressions[:1])
    # poor man's index finding because list.index doesn't support key functions
    else_index = (
        next(
            filter(
                lambda x: x[1][0] == keys_and_expressions[0][0],
                enumerate(keys_and_expressions[1:]),
            )
        )[0]
        + 1
    )
    then_expression = parse_with_keys(keys_and_expressions[1:else_index])
    else_expression = parse_with_keys(keys_and_expressions[else_index:])
    # should be
    # return Piecewise([(then_expression, condition), (else_expression, True)])
    # but I didn't implement the parse function
    return [(then_expression, condition), (else_expression, True)]


def parse(string):
    counter = Counter()
    return parse_with_keys(
        list(
            map(
                lambda x: (x[0], "".join(x[1])),
                groupby(s, lambda c: counter + nesting_level(c)),
            )
        )
    )


if __name__ == "__main__":
    # a little demonstration
    s = "c>1 ? (c<3 ? 5: 6) : (c<-5 ? 42 : 2)"
    print(parse(s))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add this tomorrow

Copy link
Member Author

@PrometheusPi PrometheusPi Oct 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might need some more time to do so


if self.debug:
print("if")
print(condition)
print("do")
print(case1)
print("else")
print(case2)
print("fi")

if self.inner_evaluate(condition, x, symbol):
return self.inner_evaluate(case1, x, symbol)
else:
return self.inner_evaluate(case2, x, symbol)

def initial_clean(self, s):
"""initially clean string from c++/picongpu methods
making it readably for sympy
s ... string
"""
if self.debug:
print("clean before:", s)
s = s.replace("^", "**")
s = s.replace("pmacc::math::", " ")
if self.debug:
print("clean after:", s)
return s

def clean_substring(self, s):
"""clean any substrings from cases from paraneties and whitespace
s ... string
"""
s.strip()

while s.startswith("("):
if not s.endswith(")"):
raise ValueError(f"Missing closing parenthesis in {s=}.")
s = s[1:-1]
return s


if __name__ == "__main__":
# load pypicongpu.json, convert json to dict and extract equation for density
# a pypicongpu.json is created for every PICMI call
with open("pypicongpu.json") as file:
sim_dict = json.load(file)
density_fct_str = sim_dict["species_initmanager"]["operations"]["simple_density"][0]["profile"]["data"][
"function_body"
]
Comment on lines +122 to +126
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heads-up: Nitpicking coming your way!

I'd probably even unindent that last line. That way the file will only be kept open for as long as it is really necessary. In general, that's a good thing to aim for in order to avoid various risks of file corruption, blocking other code's access, etc. but also to more clearly communicate the dependencies in your code. (json.load(...)'s return value might still hold a reference to the file and its [] operators could potentially rely on that file still being open.)

In practice, the dictionary look-up afterwards is certainly completely irrelevant in that respect but one might choose following such style rules in the harmless and simple cases, so they come naturally in the dangerous and difficult-to-decipher ones as well.

Suggested change
with open("pypicongpu.json") as file:
sim_dict = json.load(file)
density_fct_str = sim_dict["species_initmanager"]["operations"]["simple_density"][0]["profile"]["data"][
"function_body"
]
with open("pypicongpu.json") as file:
sim_dict = json.load(file)
density_fct_str = sim_dict["species_initmanager"]["operations"]["simple_density"][0]["profile"]["data"][
"function_body"
]


# create cpp_fct_reader class for later evaluation
reader = CppFctReader(density_fct_str)

# define positions where to evaluate the density
x_array = np.linspace(0.0, 5.0e-3, 1000)
n_array = np.zeros_like(x_array)

# evalute density
for i, x in enumerate(x_array):
n_array[i] = reader.evaluate(x)

# plot density
plt.plot(x_array, n_array)
plt.xlabel(r"$y \, \mathrm{[m]}$")
plt.ylabel(r"$n \, \mathrm{[m^-3]}$")
plt.yscale("log")
plt.show()