Skip to content

Utilities API

The builder functions are convenience wrappers that wire together an Environment, a ShopFloor, a Router, and a release policy into a ready-to-run system, so you can spin up a complete simulation in a single call. The distribution helpers sample processing times, with RunningStats providing an online (Welford) accumulator for mean and variance. SimLogger records simulation events and can emit them as JSON, plain text, or to a SQLite store.

Builders

build_immediate_release_system

build_immediate_release_system(
    *,
    env: Environment,
    scenario: Scenario = Scenario(),
    priority_policies: Callable[..., float] | None = None,
    collect_workload: bool = False,
    collect_time_series: bool = False,
    retain_job_history: bool = False,
) -> BuiltSystem[None]

Build an immediate release (push) system with no workload control.

Creates a simple push system where jobs enter the shopfloor immediately upon arrival without any release control.

Parameters:

Name Type Description Default
env Environment

The simulation environment.

required
scenario Scenario

Environment description (shop type, machine count, arrival process, service-time, due-date rule). Defaults to a 6-machine pure job shop at rho=0.90.

Scenario()
priority_policies Callable[..., float] | None

Optional callable used to assign job priorities at servers.

None
collect_workload bool

If True, attach a CurrentWorkLoadCollector.

False
collect_time_series bool

If True, servers collect queue length time series.

False
retain_job_history bool

If True, servers retain completed job references.

False

Returns:

Type Description
BuiltSystem[None]

BuiltSystem[None] with psp=None (push system; no release control)

BuiltSystem[None]

and policy=None (no policy object). Unpacks as

BuiltSystem[None]

psp, servers, shop_floor, router, policy.

Example

env = Environment() _, servers, shop_floor, router, _ = build_immediate_release_system(env=env) env.run(until=1000) print(f"Jobs completed: {len(shop_floor.jobs_done)}")

build_focus_system

build_focus_system(
    *,
    env: Environment,
    scenario: Scenario = Scenario(),
    focus_weights: tuple[
        float, float, float, float, float
    ] = (0.25, 0.25, 0.25, 0.25, 0.0),
    collect_workload: bool = False,
) -> BuiltSystem[None]

Build an immediate-release (push) system that dispatches with FOCUS.

Jobs enter the shopfloor on arrival (no release control); queue ordering at every server uses the FOCUS self-establishing rule (Kasper, Land, Teunter 2023, Omega 114, 102726) via FocusPriorityRule. Use this to study FOCUS as a standalone dispatching rule, independent of DRACO.

Parameters:

Name Type Description Default
env Environment

The simulation environment.

required
scenario Scenario

Environment description (shop type, machine count, arrival process, service-time, due-date rule). Defaults to a 6-machine pure job shop at rho=0.90.

Scenario()
focus_weights tuple[float, float, float, float, float]

FOCUS mechanism weights (w1, w2, w3, w4, w5) for (pi, omega, psi, gamma, beta); must each be in [0, 1] and sum to 1. Defaults to beta-dormant (0.25, 0.25, 0.25, 0.25, 0.0).

(0.25, 0.25, 0.25, 0.25, 0.0)
collect_workload bool

If True, attach a CurrentWorkLoadCollector.

False

Returns:

Type Description
BuiltSystem[None]

BuiltSystem[None] with psp=None (push system; no PSP) and

BuiltSystem[None]

policy=None. The FOCUS dispatching rule remains reachable via

BuiltSystem[None]

router.priority_policies. Unpacks as

BuiltSystem[None]

psp, servers, shop_floor, router, policy.

Example

env = Environment() _, servers, shop_floor, router, _ = build_focus_system(env=env) env.run(until=1000)

References

Kasper, A., Land, M., Teunter, R. (2023). Towards system state dispatching in high-variety manufacturing. Omega, 114, 102726. https://doi.org/10.1016/j.omega.2022.102726

build_lumscor_system

build_lumscor_system(
    *,
    env: Environment,
    scenario: Scenario = Scenario(),
    check_timeout: float,
    wl_norm_level: float,
    allowance_factor: int,
    collect_workload: bool = False,
) -> BuiltSystem[LumsCor]

Build a LumsCor (load-based) pull system with workload control.

Creates a pull system using LUMS-COR (Land's Upper limit for Make-Span with CORrected workload) release policy. Jobs are held in a Pre-Shop Pool and released only when server workloads stay below configured norms.

Uses CorrectedWIPStrategy which discounts downstream workload by position, and includes starvation avoidance triggers for idle servers.

Parameters:

Name Type Description Default
env Environment

The simulation environment.

required
scenario Scenario

Environment description (shop type, machine count, arrival process, service-time, due-date rule). Defaults to a 6-machine pure job shop at rho=0.90.

Scenario()
check_timeout float

Time between pool release checks.

required
wl_norm_level float

Workload norm threshold for each server. Jobs are released only if adding them keeps corrected WIP at or below this level.

required
allowance_factor int

Buffer time per server for due date calculation. Higher values result in earlier (more conservative) releases.

required
collect_workload bool

If True, attach a CurrentWorkLoadCollector.

False

Returns:

Type Description
BuiltSystem[LumsCor]

BuiltSystem[LumsCor] whose policy is the wired LumsCor

BuiltSystem[LumsCor]

instance (inspect wl_norm, retune mid-run). Unpacks as

BuiltSystem[LumsCor]

psp, servers, shop_floor, router, policy.

Example

env = Environment() psp, servers, shop_floor, router, policy = build_lumscor_system( ... env=env, check_timeout=10.0, wl_norm_level=5.0, allowance_factor=2 ... ) env.run(until=1000)

References

Land, M.J. (2006). Parameters and sensitivity in workload control. International Journal of Production Economics, 104(2), 625-638. https://doi.org/10.1016/j.ijpe.2005.03.001

build_slar_system

build_slar_system(
    *,
    env: Environment,
    scenario: Scenario = Scenario(),
    allowance_factor: float,
    collect_workload: bool = False,
) -> BuiltSystem[Slar]

Build a SLAR (Superfluous Load Avoidance Release) pull system.

Creates a pull system using SLAR release policy based on planned slack times (PST). Jobs are released from the Pre-Shop Pool when servers risk starvation or when urgent jobs need insertion.

Release triggers
  • Starvation avoidance: When queue is empty or has one job, release the job with earliest planned start time.
  • Urgent job insertion: When all queued jobs are non-urgent, insert the most urgent job with shortest processing time.

Parameters:

Name Type Description Default
env Environment

The simulation environment.

required
allowance_factor float

Slack allowance per operation (parameter 'k' in paper). Higher values provide more buffer time per server.

required
scenario Scenario

Environment description (shop type, machine count, arrival process, service-time, due-date rule). Defaults to a 6-machine pure job shop at rho=0.90.

Scenario()
collect_workload bool

If True, attach a CurrentWorkLoadCollector.

False

Returns:

Type Description
BuiltSystem[Slar]

BuiltSystem[Slar] whose policy is the wired Slar instance.

BuiltSystem[Slar]

Unpacks as psp, servers, shop_floor, router, policy.

Example

env = Environment() psp, servers, shop_floor, router, policy = build_slar_system( ... env=env, allowance_factor=3.0 ... ) env.run(until=1000)

References

Land, M.J. & Gaalman, G.J.C. (1998). The performance of workload control concepts in job shops: Improving the release method. International Journal of Production Economics, 56-57, 347-364. https://doi.org/10.1016/S0925-5273(98)00052-8

build_slar_limit_system

build_slar_limit_system(
    *,
    env: Environment,
    scenario: Scenario = Scenario(),
    allowance_factor: float,
    wl_norm_level: float,
    collect_workload: bool = False,
) -> BuiltSystem[SlarLimit]

Build a SLAR-Limit (load-bounded SLAR) pull system.

Creates a pull system using SLAR-Limit release policy. SLAR-Limit extends classic SLAR by adding a workload-norm limit to the urgent insertion branch: urgent PSP candidates are iterated in ascending SPT order and the first whose corrected workload contribution PT/(i+1) fits all server norms is released. If none fits, the policy falls back to the standard SLAR postponed-starvation branch.

Uses CorrectedWIPStrategy on the shopfloor (required by the norm check).

Parameters:

Name Type Description Default
env Environment

The simulation environment.

required
allowance_factor float

Slack allowance per operation (parameter 'k' in the SLAR paper).

required
wl_norm_level float

Workload norm threshold applied uniformly to every server. An urgent PSP candidate is released only if adding its corrected contribution keeps every server in its routing at or below this level.

required
scenario Scenario

Environment description (shop type, machine count, arrival process, service-time, due-date rule). Defaults to a 6-machine pure job shop at rho=0.90.

Scenario()
collect_workload bool

If True, attach a CurrentWorkLoadCollector to the shopfloor for workload time-series.

False

Returns:

Type Description
BuiltSystem[SlarLimit]

BuiltSystem[SlarLimit] whose policy is the wired SlarLimit

BuiltSystem[SlarLimit]

instance (inspect wl_norm). Unpacks as

BuiltSystem[SlarLimit]

psp, servers, shop_floor, router, policy.

Example

env = Environment() psp, servers, shop_floor, router, policy = build_slar_limit_system( ... env=env, allowance_factor=3.0, wl_norm_level=5.0 ... ) env.run(until=1000)

References

Thürer, M. & Stevenson, M. (2021). Improving superfluous load avoidance release (SLAR): A new load-based SLAR mechanism. International Journal of Production Economics, 231, 107881. https://doi.org/10.1016/j.ijpe.2020.107881

build_draco_system

build_draco_system(
    *,
    env: Environment,
    scenario: Scenario = Scenario(),
    wip_target: int,
    loop_target: int,
    focus_weights: tuple[
        float, float, float, float, float
    ] = (0.25, 0.25, 0.25, 0.25, 0.0),
    total_impact_weights: tuple[float, float, float] = (
        0.25,
        0.25,
        0.5,
    ),
    collect_workload: bool = False,
) -> BuiltSystem[Draco]

Build a DRACO (non-hierarchical WIP control) pull system.

Creates a pull system using the DRACO policy (Kasper, Land, Teunter 2023, IJPE 257, 108768) that merges release, authorization, and dispatching into a single per-server decision. On every job completion at any server k, DRACO scores every candidate in Q_k ∪ P_k by a weighted total impact w^R·R + w^A·A + w^D·D and selects the maximum. The winner is dispatched to k next.

Wiring (performed by Draco.__init__): - priority_policy: Draco.priority_policy (queue-side DRACO score; returns -inf one-shot for forced PSP winners to preserve strict paper semantics). - shop_floor.on_processing_end: Draco.decide_next_job. - psp.on_arrival(starvation_avoidance): prevents idle-server starvation when a new arrival's first server is idle.

Parameters:

Name Type Description Default
env Environment

The simulation environment.

required
wip_target int

Target shop WIP τ (count of jobs).

required
loop_target int

Target overlapping loop ε_{k,u}. Scalar applied to every pair; for per-pair targets, instantiate Draco directly with a dict[(Server, Server), int].

required
focus_weights tuple[float, float, float, float, float]

FOCUS mechanism weights (w1, w2, w3, w4, w5).

(0.25, 0.25, 0.25, 0.25, 0.0)
total_impact_weights tuple[float, float, float]

(w^R, w^A, w^D) for the DRACO total impact; must sum to 1. Defaults to (0.25, 0.25, 0.5) — the paper's full DRACO configuration (Table 2).

(0.25, 0.25, 0.5)
scenario Scenario

Environment description (shop type, machine count, arrival process, service-time, due-date rule). Defaults to a 6-machine pure job shop at rho=0.90.

Scenario()
collect_workload bool

If True, attach a CurrentWorkLoadCollector.

False

Returns:

Type Description
BuiltSystem[Draco]

BuiltSystem[Draco] whose policy is the wired Draco instance.

BuiltSystem[Draco]

Unpacks as psp, servers, shop_floor, router, policy.

Example

env = Environment() psp, servers, shop_floor, router, policy = build_draco_system( ... env=env, wip_target=8, loop_target=4 ... ) env.run(until=1000)

References

Kasper, A., Land, M., Teunter, R. (2023). Non-hierarchical work-in-progress control in manufacturing. International Journal of Production Economics, 257, 108768. https://doi.org/10.1016/j.ijpe.2022.108768

Scenario

Scenario dataclass

Immutable description of a shop environment and its order stream.

Attributes:

Name Type Description
arrival_process Callable[[float], Callable[[], float]]

Factory producing the inter-arrival sampler. It is called with the resolved arrival rate (orders per time unit, from resolved_arrival_rate) and must return a zero-argument sampler of inter-arrival times whose mean is 1 / rate. The default Exponential satisfies this (Poisson arrivals: Exponential(rate) has mean 1 / rate). Passing a factory whose sampler mean is not 1 / rate — e.g. Erlang, whose Erlang(rate) has mean shape / rate — silently breaks the target_utilization calibration even though it satisfies the declared type.

arrival_rate float | None

Explicit arrival rate (orders per time unit) that overrides the mix-weighted derivation when not None; resolved_arrival_rate returns it verbatim, skipping the ρ→λ derivation.

target_utilization float

Target steady-state shop utilization ρ, a fraction in (0, 1]; drives the arrival-rate derivation unless arrival_rate is set.

families tuple[SkuFamily, ...]

The product mix (one or more SkuFamily entries); each carries its own service time, due-date offset/rule, and mix weight.

due_date_offset Distribution

Default due-date offset distribution applied to families that do not set their own due_date_offset.

arrival_process class-attribute instance-attribute

arrival_process: Callable[[float], Callable[[], float]] = (
    Exponential
)

arrival_rate class-attribute instance-attribute

arrival_rate: float | None = None

due_date_offset class-attribute instance-attribute

due_date_offset: Distribution = Uniform(low=30.0, high=45.0)

families class-attribute instance-attribute

families: tuple[SkuFamily, ...] = (SkuFamily(),)

n_servers class-attribute instance-attribute

n_servers: int = 6

shop_type class-attribute instance-attribute

shop_type: ShopType = PJS

target_utilization class-attribute instance-attribute

target_utilization: float = 0.9

__init__

__init__(
    shop_type: ShopType = ShopType.PJS,
    n_servers: int = 6,
    target_utilization: float = 0.9,
    families: tuple[SkuFamily, ...] = (SkuFamily(),),
    due_date_offset: Distribution = Uniform(
        low=30.0, high=45.0
    ),
    arrival_process: Callable[
        [float], Callable[[], float]
    ] = Exponential,
    arrival_rate: float | None = None,
) -> None

__post_init__

__post_init__() -> None

build_floor

build_floor(
    env: Environment,
    *,
    collect_workload: bool = False,
    collect_time_series: bool = False,
    retain_job_history: bool = False,
) -> tuple[ShopFloor, tuple[Server, ...]]

Create the ShopFloor and n_servers single-capacity servers.

build_router

build_router(
    env: Environment,
    shop_floor: ShopFloor,
    servers: Sequence[Server],
    *,
    psp: PreShopPool | None,
    priority_policies: Callable[..., float] | None = None,
) -> Router

Assemble the Router from the family mix: arrival process, per-family routing, service-time distributions, and due-date offsets/rules.

Parameters:

Name Type Description Default
env Environment

The simulation environment.

required
shop_floor ShopFloor

The shop floor the router feeds jobs into.

required
servers Sequence[Server]

The server pool the routings and service-time maps are built over. Its length must equal self.n_servers: that count drives the arrival-rate derivation (via resolved_arrival_rate, used for both the expected routing length E[L] and the machine count M), so a mismatch would silently miscalibrate utilization.

required
psp PreShopPool | None

The pre-shop pool, or None for immediate release.

required
priority_policies Callable[..., float] | None

Optional priority policy callable for the router.

None

Raises:

Type Description
ValueError

If len(servers) != self.n_servers.

general_flow_shop classmethod

general_flow_shop(**overrides: object) -> Scenario

General Flow Shop preset (directed/sorted routing).

pure_flow_shop classmethod

pure_flow_shop(**overrides: object) -> Scenario

Pure Flow Shop preset (all machines, fixed order).

pure_job_shop classmethod

pure_job_shop(**overrides: object) -> Scenario

Pure Job Shop preset (undirected routing).

resolved_arrival_rate

resolved_arrival_rate() -> float

The exponential arrival rate (explicit override, else mix-weighted derivation).

The returned rate (orders per time unit) is fed to arrival_process in build_router, which expects a sampler of inter-arrival times with mean 1 / rate (the default Exponential satisfies this).

single classmethod

single(
    *,
    service_time: Distribution | None = None,
    due_date_offset: Distribution | None = None,
    twk_allowance_factor: float | None = None,
    name: str = "F1",
    **shop: object,
) -> Scenario

Convenience for the common one-product case: build a single-family Scenario.

name is always forwarded (it has a default), while service_time, due_date_offset, and twk_allowance_factor are forwarded only when non-None, so SkuFamily's own defaults fill the rest. **shop forwards shop-level kwargs (shop_type, n_servers, target_utilization, arrival_rate, ...).

ShopType

Bases: Enum

Standard workload-control benchmark shop types (by routing directedness).

Distributions

pure_job_shop_routing

pure_job_shop_routing(
    servers: Sequence[Server],
) -> Callable[[], Sequence[Server]]

Create a Pure Job Shop (PJS) routing factory.

The Pure Job Shop — also called a randomly routed job shop (Conway et al., 1967) — is the least directed of the standard workload-control benchmark shops: each order has a random routing length and a random routing direction (Oosterman, Land & Gaalman, 2000; Kasper, Land & Teunter, 2023). The factory draws a routing length k uniformly from [1, len(servers)] and returns k distinct servers in random order (sampling without replacement, so re-entrant flow is prohibited).

See general_flow_shop_routing (same length rule, but the subset is sorted into a directed flow) and pure_flow_shop_routing (fixed length, fully directed) for the directed counterparts on the directedness spectrum.

Parameters:

Name Type Description Default
servers Sequence[Server]

The pool of servers to sample from. Its order defines the canonical machine index used by the directed sibling factories.

required

Returns:

Type Description
Callable[[], Sequence[Server]]

A callable that, when invoked, returns a random subset of servers

Callable[[], Sequence[Server]]

(between 1 and len(servers) inclusive) in random order, without

Callable[[], Sequence[Server]]

replacement.

Example

routing = pure_job_shop_routing(servers) routing() # Returns e.g., [server_2, server_5, server_1]

References

Oosterman, B., Land, M., & Gaalman, G. (2000). The influence of shop characteristics on workload control. International Journal of Production Economics, 68(1), 107-119. https://doi.org/10.1016/S0925-5273(99)00141-3

general_flow_shop_routing

general_flow_shop_routing(
    servers: Sequence[Server],
) -> Callable[[], Sequence[Server]]

Create a General Flow Shop (GFS) routing factory.

The General Flow Shop is the directed counterpart of the Pure Job Shop: each order has the same random routing length k ~ U[1, len(servers)] and the same equal per-machine inclusion probability, but the selected machines are sorted into ascending index order so that orders flow in a single direction with typical upstream and downstream stations (Oosterman, Land & Gaalman, 2000; Kasper, Land & Teunter, 2023). Machines are drawn without replacement, so re-entrant flow is prohibited.

Parameters:

Name Type Description Default
servers Sequence[Server]

The pool of servers to sample from. Their order defines the canonical machine index along which routings are directed.

required

Returns:

Type Description
Callable[[], Sequence[Server]]

A callable that, when invoked, returns a random subset of servers

Callable[[], Sequence[Server]]

(between 1 and len(servers) inclusive), distinct and ordered by

Callable[[], Sequence[Server]]

ascending canonical index.

Example

routing = general_flow_shop_routing(servers) routing() # Returns e.g., [server_1, server_4, server_5]

References

Oosterman, B., Land, M., & Gaalman, G. (2000). The influence of shop characteristics on workload control. International Journal of Production Economics, 68(1), 107-119. https://doi.org/10.1016/S0925-5273(99)00141-3

pure_flow_shop_routing

pure_flow_shop_routing(
    servers: Sequence[Server],
) -> Callable[[], Sequence[Server]]

Create a Pure Flow Shop (PFS) routing factory.

The Pure Flow Shop is the most directed benchmark shop: every order has a fixed routing length equal to the number of machines and visits all servers in the same fixed (directed) sequence (Oosterman, Land & Gaalman, 2000; Kasper, Land & Teunter, 2023). The routing is deterministic — every job shares the identical routing — so the factory ignores the RNG.

Note

Because every order visits every machine, the mean routing length is len(servers) rather than (len(servers) + 1) / 2 as for the pure job shop / general flow shop. Calibrate the arrival rate accordingly to hit a target utilization (see arrival_rate_for_utilization).

Parameters:

Name Type Description Default
servers Sequence[Server]

The servers visited, in the fixed order they are visited.

required

Returns:

Type Description
Callable[[], Sequence[Server]]

A callable that, when invoked, returns all servers in their given order.

Example

routing = pure_flow_shop_routing(servers) routing() # Always returns every server, in order

References

Oosterman, B., Land, M., & Gaalman, G. (2000). The influence of shop characteristics on workload control. International Journal of Production Economics, 68(1), 107-119. https://doi.org/10.1016/S0925-5273(99)00141-3

Distribution

Bases: Protocol

A callable random variate that also reports its analytical mean.

Any object that is callable with no arguments returning a float and exposes a mean property satisfies this protocol. The Router only needs the callable side (the Sampler contract); Scenario reads .mean to derive the arrival rate for a target utilization.

Exponential dataclass

Exponential distribution with rate rate (mean 1/rate).

Erlang dataclass

Erlang (Gamma with integer shape) distribution, mean shape/rate.

TruncatedErlang dataclass

Erlang truncated to [0, max_value] by rejection resampling.

The default shape=2 reproduces the classic truncated 2-Erlang service process. mean is the TRUE conditional mean E[X | X <= max_value], strictly below the nominal shape/rate whenever max_value is finite.

LogNormal dataclass

Lognormal distribution with underlying-normal params mu/sigma.

Uniform dataclass

Continuous uniform distribution on [low, high], mean (low+high)/2.

Deterministic dataclass

Degenerate distribution that always returns value.

arrival_rate_for_utilization

arrival_rate_for_utilization(
    target_utilization: float,
    *,
    n_servers: int,
    mean_routing_length: float,
    mean_processing_time: float = 1.0,
) -> float

Derive the Poisson arrival rate that yields a target shop utilization.

Shop utilization couples to the mean routing length through rho = lambda * E[L] * E[p] / M, where lambda is the order arrival rate, E[L] the mean routing length, E[p] the mean operation processing time, and M the number of machines. Inverting gives::

lambda = rho * M / (E[L] * E[p])

Because E[L] differs by shop type — M for a pure flow shop versus (M + 1) / 2 for the pure job shop / general flow shop — the arrival rate is not portable across shop types: a pure flow shop reusing a job-shop arrival rate is driven unstable (rho > 1). Use this helper to recompute the rate whenever the shop type, machine count, or processing-time mean changes.

Parameters:

Name Type Description Default
target_utilization float

Desired steady-state utilization rho in (0, 1].

required
n_servers int

Number of machines M.

required
mean_routing_length float

Mean number of operations per order E[L].

required
mean_processing_time float

Mean operation processing time E[p]. Defaults to 1.0 (the nominal 2-Erlang mean; see TruncatedErlang).

1.0

Returns:

Type Description
float

The arrival rate lambda (orders per time unit). Its reciprocal is

float

the mean inter-arrival time for an exponential arrival process.

Example

Pure flow shop, M = 6, rho = 0.90, E[p] = 1.0 -> mean IAT 1.111

round(1 / arrival_rate_for_utilization(0.9, n_servers=6, mean_routing_length=6), 3) 1.111

General flow shop / pure job shop, E[L] = 3.5 -> mean IAT 0.648

round(1 / arrival_rate_for_utilization(0.9, n_servers=6, mean_routing_length=3.5), 3) 0.648

twk_due_date

twk_due_date(
    allowance_factor: float,
) -> Callable[[Sequence[float]], float]

Create a Total Work Content (TWK) due-date offset rule.

The TWK procedure sets an order's due date proportional to its total work content: due_date = arrival_time + K * sum(p_ij), where K is the allowance factor and sum(p_ij) is the order's total processing time (Kasper, Land & Teunter, 2023). Larger, more work-intensive orders are granted proportionally more lead time than the flat-allowance rule.

The returned callable matches the Router due_date_rule contract: it receives the job's sampled operation processing times and returns the due-date offset (the allowance added to the arrival time).

Parameters:

Name Type Description Default
allowance_factor float

The constant K multiplying total work content.

required

Returns:

Type Description
Callable[[Sequence[float]], float]

A callable (processing_times) -> float returning K * sum(...).

Example

rule = twk_due_date(8.74) # FOCUS pure-job-shop K (6 work centres) rule([1.0, 2.0]) # offset for a 3.0 work-content order 26.22

References

Kasper, A., Land, M., & Teunter, R. (2023). Towards system state dispatching in high-variety manufacturing. Omega, 114, 102726. https://doi.org/10.1016/j.omega.2022.102726

RunningStats

Compute running mean, variance, and standard deviation using Welford's algorithm.

Welford's algorithm maintains numerical stability by avoiding catastrophic cancellation that occurs when computing variance via the naive formula (sum of squares minus squared sum). This is especially important for simulations with many observations or values of similar magnitude.

Attributes:

Name Type Description
n

Number of observations added.

mean

Current running mean of all observations.

M2

Sum of squared differences from the mean (internal state).

Example

stats = RunningStats() for value in [2.0, 4.0, 6.0]: ... stats.update(value) stats.mean 4.0 stats.std 2.0

M2 instance-attribute

M2 = 0.0

mean instance-attribute

mean = 0.0

n instance-attribute

n = 0

std property

std: float

Sample standard deviation (square root of variance).

variance property

variance: float

Sample variance using Bessel's correction (n-1 denominator).

__init__

__init__() -> None

Initialize statistics counters to zero.

update

update(x: float) -> None

Add a new observation and update running statistics.

Parameters:

Name Type Description Default
x float

The new value to incorporate into the statistics.

required

z_norm

z_norm(x: float) -> float

Compute the z-score (standard score) for a given value.

Parameters:

Name Type Description Default
x float

The value to normalize.

required

Returns:

Type Description
float

The z-score (x - mean) / std, or 0.0 if insufficient data

float

or zero standard deviation.

Logging

SimLogger

Per-environment logger with structured output and history buffer.

Each Environment instance has its own SimLogger, which: - Automatically includes simulation time in log output - Supports JSON or text output format - Maintains an in-memory history buffer - Supports per-component filtering

__slots__ class-attribute instance-attribute

__slots__ = (
    "_env",
    "_log_file",
    "_log_format",
    "_history",
    "_component_filters",
    "_handler_id",
    "_env_id",
    "_finalizer",
    "_db_store",
)

_component_filters instance-attribute

_component_filters: dict[str, bool] = {}

_db_store instance-attribute

_db_store: SQLiteEventStore | None = (
    SQLiteEventStore(db_path)
    if db_path is not None
    else None
)

_env instance-attribute

_env = env

_env_id instance-attribute

_env_id = hex

_finalizer instance-attribute

_finalizer = finalize(
    env, _finalize_logger, _handler_id, _db_store
)

_global_level class-attribute

_global_level: str = 'INFO'

_handler_id instance-attribute

_handler_id: int | None = None

_history instance-attribute

_history = EventHistoryBuffer(max_size=history_size)

_log_file instance-attribute

_log_file = Path(log_file) if log_file else None

_log_format instance-attribute

_log_format = log_format

db_enabled property

db_enabled: bool

Return whether SQLite storage is enabled.

env_id property

env_id: str

Return this environment's unique identifier.

history property

history: EventHistoryBuffer

Access the event history buffer.

__init__

__init__(
    *,
    env: Environment,
    log_file: str | Path | None = None,
    log_format: Literal["text", "json"] = "text",
    history_size: int = 1000,
    db_path: str | Path | None = None,
) -> None

Initialize a per-environment logger.

Parameters:

Name Type Description Default
env Environment

The simulation environment this logger is attached to

required
log_file str | Path | None

Optional file path for log output (defaults to stderr)

None
log_format Literal['text', 'json']

Output format ("text" or "json")

'text'
history_size int

Maximum number of events to keep in history buffer

1000
db_path str | Path | None

Optional SQLite database path for persistent event storage. If provided, events are stored in both memory buffer and SQLite.

None

_format_message

_format_message(record: dict[str, Any]) -> str

Format a log record for output.

_log

_log(
    level: str,
    message: str,
    *,
    component: str | None = None,
    **extra: Any,
) -> None

Internal logging method.

_make_sink

_make_sink() -> Any

Create a custom sink that handles formatting.

_setup_handler

_setup_handler() -> None

Configure loguru handler for this environment.

_should_log

_should_log(component: str | None) -> bool

Check if logging is enabled for this component.

_should_log_level

_should_log_level(level: str) -> bool

Check if the given level should be logged based on global level.

close

close() -> None

Clean up loguru handler and SQLite connection.

Should be called when the environment is no longer needed, especially important for multiprocessing scenarios.

debug

debug(
    message: str,
    *,
    component: str | None = None,
    **extra: Any,
) -> None

Log a debug message.

disable_component

disable_component(component: str) -> None

Disable logging for a specific component type.

Parameters:

Name Type Description Default
component str

Component class name (e.g., "Server", "ProductionJob")

required

enable_component

enable_component(component: str) -> None

Enable logging for a specific component type.

Parameters:

Name Type Description Default
component str

Component class name (e.g., "Server", "ProductionJob")

required

error

error(
    message: str,
    *,
    component: str | None = None,
    **extra: Any,
) -> None

Log an error message.

execute_sql

execute_sql(
    sql: str, params: tuple[Any, ...] = ()
) -> list[sqlite3.Row]

Execute arbitrary SQL query on the log database.

The database has a log_events table with columns: id, env_id, timestamp, level, message, component, extra, wall_time

Parameters:

Name Type Description Default
sql str

SQL query to execute.

required
params tuple[Any, ...]

Query parameters.

()

Returns:

Type Description
list[Row]

List of sqlite3.Row objects.

Raises:

Type Description
RuntimeError

If SQLite storage is not enabled.

get_level classmethod

get_level() -> str

Get current global log level.

info

info(
    message: str,
    *,
    component: str | None = None,
    **extra: Any,
) -> None

Log an info message.

query_sql

query_sql(
    *,
    level: str | None = None,
    component: str | None = None,
    since: float | None = None,
    until: float | None = None,
    limit: int | None = None,
    offset: int = 0,
) -> list[LogEvent]

Query events from SQLite storage.

Similar to history.query() but queries the persistent SQLite database. Only available when db_path was provided at initialization.

Parameters:

Name Type Description Default
level str | None

Filter by log level (e.g., "INFO", "ERROR").

None
component str | None

Filter by component name.

None
since float | None

Include events with timestamp >= since.

None
until float | None

Include events with timestamp <= until.

None
limit int | None

Maximum number of events to return.

None
offset int

Number of events to skip.

0

Returns:

Type Description
list[LogEvent]

List of matching LogEvent objects.

Raises:

Type Description
RuntimeError

If SQLite storage is not enabled.

set_level classmethod

set_level(level: str) -> None

Set global log level for all environments.

Parameters:

Name Type Description Default
level str

Log level name (DEBUG, INFO, WARNING, ERROR, CRITICAL)

required

warning

warning(
    message: str,
    *,
    component: str | None = None,
    **extra: Any,
) -> None

Log a warning message.

LogEvent dataclass

Immutable record of a logged event.

component class-attribute instance-attribute

component: str | None = None

extra class-attribute instance-attribute

extra: dict[str, Any] = field(default_factory=dict)

level instance-attribute

level: str

message instance-attribute

message: str

timestamp instance-attribute

timestamp: float

__init__

__init__(
    timestamp: float,
    level: str,
    message: str,
    component: str | None = None,
    extra: dict[str, Any] = dict(),
) -> None

EventHistoryBuffer

Fixed-size ring buffer for log events.

__slots__ class-attribute instance-attribute

__slots__ = ('_buffer',)

_buffer instance-attribute

_buffer: deque[LogEvent] = deque(maxlen=max_size)

__init__

__init__(max_size: int = 1000) -> None

__iter__

__iter__() -> Iterator[LogEvent]

__len__

__len__() -> int

append

append(event: LogEvent) -> None

Add an event to the buffer.

clear

clear() -> None

Clear all events from the buffer.

query

query(
    *,
    level: str | None = None,
    component: str | None = None,
    since: float | None = None,
    until: float | None = None,
) -> list[LogEvent]

Query history with optional filters.

Parameters:

Name Type Description Default
level str | None

Filter by log level (e.g., "INFO", "ERROR")

None
component str | None

Filter by component name (e.g., "Server")

None
since float | None

Include events with timestamp >= since

None
until float | None

Include events with timestamp <= until

None

Returns:

Type Description
list[LogEvent]

List of matching LogEvent objects

SQLiteEventStore

Thread-safe SQLite storage for log events.

Provides persistent storage of log events across simulation runs. Multiple environments can share a single database file, distinguished by the env_id column.

_SCHEMA class-attribute instance-attribute

_SCHEMA = "\n        CREATE TABLE IF NOT EXISTS log_events (\n            id INTEGER PRIMARY KEY AUTOINCREMENT,\n            env_id TEXT NOT NULL,\n            timestamp REAL NOT NULL,\n            level TEXT NOT NULL,\n            message TEXT NOT NULL,\n            component TEXT,\n            extra TEXT,\n            wall_time TEXT NOT NULL\n        );\n        CREATE INDEX IF NOT EXISTS idx_env_id ON log_events(env_id);\n        CREATE INDEX IF NOT EXISTS idx_timestamp ON log_events(env_id, timestamp);\n        CREATE INDEX IF NOT EXISTS idx_level ON log_events(env_id, level);\n        CREATE INDEX IF NOT EXISTS idx_component ON log_events(env_id, component);\n    "

__slots__ class-attribute instance-attribute

__slots__ = ('_db_path', '_conn', '_lock')

_conn instance-attribute

_conn: Connection = connect(
    str(_db_path), check_same_thread=False
)

_db_path instance-attribute

_db_path: Path = Path(db_path)

_lock instance-attribute

_lock: Lock = Lock()

__init__

__init__(db_path: str | Path) -> None

Open or create the SQLite database.

Parameters:

Name Type Description Default
db_path str | Path

Path to the SQLite database file.

required

_init_schema

_init_schema() -> None

Create tables and indexes if they don't exist.

close

close() -> None

Close the database connection.

execute_sql

execute_sql(
    sql: str, params: tuple[Any, ...] = ()
) -> list[sqlite3.Row]

Execute arbitrary SQL and return raw rows.

Parameters:

Name Type Description Default
sql str

SQL query to execute.

required
params tuple[Any, ...]

Query parameters.

()

Returns:

Type Description
list[Row]

List of sqlite3.Row objects.

insert

insert(
    env_id: str, event: LogEvent, wall_time: str
) -> None

Insert a single event.

Parameters:

Name Type Description Default
env_id str

Environment identifier.

required
event LogEvent

The LogEvent to store.

required
wall_time str

ISO8601 wall-clock time string.

required

query

query(
    env_id: str,
    *,
    level: str | None = None,
    component: str | None = None,
    since: float | None = None,
    until: float | None = None,
    limit: int | None = None,
    offset: int = 0,
) -> list[LogEvent]

Query events with optional filters.

Parameters:

Name Type Description Default
env_id str

Environment identifier to filter by.

required
level str | None

Filter by log level (e.g., "INFO", "ERROR").

None
component str | None

Filter by component name.

None
since float | None

Include events with timestamp >= since.

None
until float | None

Include events with timestamp <= until.

None
limit int | None

Maximum number of events to return.

None
offset int

Number of events to skip.

0

Returns:

Type Description
list[LogEvent]

List of matching LogEvent objects.