Utilities API
The builder functions are convenience wrappers that wire together an Environment, a
ShopFloor, a Router, and a release policy into a ready-to-run system, so you can spin up
a complete simulation in a single call. The distribution helpers sample processing times, with
RunningStats providing an online (Welford) accumulator for mean and variance. SimLogger
records simulation events and can emit them as JSON, plain text, or to a SQLite store.
Builders
build_immediate_release_system
build_immediate_release_system(
*,
env: Environment,
scenario: Scenario = Scenario(),
priority_policies: Callable[..., float] | None = None,
collect_workload: bool = False,
collect_time_series: bool = False,
retain_job_history: bool = False,
) -> BuiltSystem[None]
Build an immediate release (push) system with no workload control.
Creates a simple push system where jobs enter the shopfloor immediately upon arrival without any release control.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
env
|
|
The simulation environment. |
required |
scenario
|
|
Environment description (shop type, machine count, arrival process, service-time, due-date rule). Defaults to a 6-machine pure job shop at rho=0.90. |
|
priority_policies
|
|
Optional callable used to assign job priorities at servers. |
None
|
collect_workload
|
|
If True, attach a |
False
|
collect_time_series
|
|
If True, servers collect queue length time series. |
False
|
retain_job_history
|
|
If True, servers retain completed job references. |
False
|
Returns:
| Type | Description |
|---|---|
|
|
|
and |
|
|
Example
env = Environment() _, servers, shop_floor, router, _ = build_immediate_release_system(env=env) env.run(until=1000) print(f"Jobs completed: {len(shop_floor.jobs_done)}")
build_focus_system
build_focus_system(
*,
env: Environment,
scenario: Scenario = Scenario(),
focus_weights: tuple[
float, float, float, float, float
] = (0.25, 0.25, 0.25, 0.25, 0.0),
collect_workload: bool = False,
) -> BuiltSystem[None]
Build an immediate-release (push) system that dispatches with FOCUS.
Jobs enter the shopfloor on arrival (no release control); queue ordering
at every server uses the FOCUS self-establishing rule (Kasper, Land,
Teunter 2023, Omega 114, 102726) via FocusPriorityRule. Use this to
study FOCUS as a standalone dispatching rule, independent of DRACO.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
env
|
|
The simulation environment. |
required |
scenario
|
|
Environment description (shop type, machine count, arrival process, service-time, due-date rule). Defaults to a 6-machine pure job shop at rho=0.90. |
|
focus_weights
|
|
FOCUS mechanism weights |
(0.25, 0.25, 0.25, 0.25, 0.0)
|
collect_workload
|
|
If True, attach a |
False
|
Returns:
| Type | Description |
|---|---|
|
|
|
|
|
|
|
|
Example
env = Environment() _, servers, shop_floor, router, _ = build_focus_system(env=env) env.run(until=1000)
References
Kasper, A., Land, M., Teunter, R. (2023). Towards system state dispatching in high-variety manufacturing. Omega, 114, 102726. https://doi.org/10.1016/j.omega.2022.102726
build_lumscor_system
build_lumscor_system(
*,
env: Environment,
scenario: Scenario = Scenario(),
check_timeout: float,
wl_norm_level: float,
allowance_factor: int,
collect_workload: bool = False,
) -> BuiltSystem[LumsCor]
Build a LumsCor (load-based) pull system with workload control.
Creates a pull system using LUMS-COR (Land's Upper limit for Make-Span with CORrected workload) release policy. Jobs are held in a Pre-Shop Pool and released only when server workloads stay below configured norms.
Uses CorrectedWIPStrategy which discounts downstream workload by position, and includes starvation avoidance triggers for idle servers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
env
|
|
The simulation environment. |
required |
scenario
|
|
Environment description (shop type, machine count, arrival process, service-time, due-date rule). Defaults to a 6-machine pure job shop at rho=0.90. |
|
check_timeout
|
|
Time between pool release checks. |
required |
wl_norm_level
|
|
Workload norm threshold for each server. Jobs are released only if adding them keeps corrected WIP at or below this level. |
required |
allowance_factor
|
|
Buffer time per server for due date calculation. Higher values result in earlier (more conservative) releases. |
required |
collect_workload
|
|
If True, attach a |
False
|
Returns:
| Type | Description |
|---|---|
|
|
|
instance (inspect |
|
|
Example
env = Environment() psp, servers, shop_floor, router, policy = build_lumscor_system( ... env=env, check_timeout=10.0, wl_norm_level=5.0, allowance_factor=2 ... ) env.run(until=1000)
References
Land, M.J. (2006). Parameters and sensitivity in workload control. International Journal of Production Economics, 104(2), 625-638. https://doi.org/10.1016/j.ijpe.2005.03.001
build_slar_system
build_slar_system(
*,
env: Environment,
scenario: Scenario = Scenario(),
allowance_factor: float,
collect_workload: bool = False,
) -> BuiltSystem[Slar]
Build a SLAR (Superfluous Load Avoidance Release) pull system.
Creates a pull system using SLAR release policy based on planned slack times (PST). Jobs are released from the Pre-Shop Pool when servers risk starvation or when urgent jobs need insertion.
Release triggers
- Starvation avoidance: When queue is empty or has one job, release the job with earliest planned start time.
- Urgent job insertion: When all queued jobs are non-urgent, insert the most urgent job with shortest processing time.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
env
|
|
The simulation environment. |
required |
allowance_factor
|
|
Slack allowance per operation (parameter 'k' in paper). Higher values provide more buffer time per server. |
required |
scenario
|
|
Environment description (shop type, machine count, arrival process, service-time, due-date rule). Defaults to a 6-machine pure job shop at rho=0.90. |
|
collect_workload
|
|
If True, attach a |
False
|
Returns:
| Type | Description |
|---|---|
|
|
|
Unpacks as |
Example
env = Environment() psp, servers, shop_floor, router, policy = build_slar_system( ... env=env, allowance_factor=3.0 ... ) env.run(until=1000)
References
Land, M.J. & Gaalman, G.J.C. (1998). The performance of workload control concepts in job shops: Improving the release method. International Journal of Production Economics, 56-57, 347-364. https://doi.org/10.1016/S0925-5273(98)00052-8
build_slar_limit_system
build_slar_limit_system(
*,
env: Environment,
scenario: Scenario = Scenario(),
allowance_factor: float,
wl_norm_level: float,
collect_workload: bool = False,
) -> BuiltSystem[SlarLimit]
Build a SLAR-Limit (load-bounded SLAR) pull system.
Creates a pull system using SLAR-Limit release policy. SLAR-Limit extends classic SLAR by adding a workload-norm limit to the urgent insertion branch: urgent PSP candidates are iterated in ascending SPT order and the first whose corrected workload contribution PT/(i+1) fits all server norms is released. If none fits, the policy falls back to the standard SLAR postponed-starvation branch.
Uses CorrectedWIPStrategy on the shopfloor (required by the norm check).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
env
|
|
The simulation environment. |
required |
allowance_factor
|
|
Slack allowance per operation (parameter 'k' in the SLAR paper). |
required |
wl_norm_level
|
|
Workload norm threshold applied uniformly to every server. An urgent PSP candidate is released only if adding its corrected contribution keeps every server in its routing at or below this level. |
required |
scenario
|
|
Environment description (shop type, machine count, arrival process, service-time, due-date rule). Defaults to a 6-machine pure job shop at rho=0.90. |
|
collect_workload
|
|
If True, attach a |
False
|
Returns:
| Type | Description |
|---|---|
|
|
|
instance (inspect |
|
|
Example
env = Environment() psp, servers, shop_floor, router, policy = build_slar_limit_system( ... env=env, allowance_factor=3.0, wl_norm_level=5.0 ... ) env.run(until=1000)
References
Thürer, M. & Stevenson, M. (2021). Improving superfluous load avoidance release (SLAR): A new load-based SLAR mechanism. International Journal of Production Economics, 231, 107881. https://doi.org/10.1016/j.ijpe.2020.107881
build_draco_system
build_draco_system(
*,
env: Environment,
scenario: Scenario = Scenario(),
wip_target: int,
loop_target: int,
focus_weights: tuple[
float, float, float, float, float
] = (0.25, 0.25, 0.25, 0.25, 0.0),
total_impact_weights: tuple[float, float, float] = (
0.25,
0.25,
0.5,
),
collect_workload: bool = False,
) -> BuiltSystem[Draco]
Build a DRACO (non-hierarchical WIP control) pull system.
Creates a pull system using the DRACO policy (Kasper, Land, Teunter
2023, IJPE 257, 108768) that merges release, authorization, and
dispatching into a single per-server decision. On every job
completion at any server k, DRACO scores every candidate in
Q_k ∪ P_k by a weighted total impact w^R·R + w^A·A + w^D·D
and selects the maximum. The winner is dispatched to k next.
Wiring (performed by Draco.__init__):
- priority_policy: Draco.priority_policy (queue-side
DRACO score; returns -inf one-shot for forced PSP winners
to preserve strict paper semantics).
- shop_floor.on_processing_end: Draco.decide_next_job.
- psp.on_arrival(starvation_avoidance): prevents idle-server
starvation when a new arrival's first server is idle.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
env
|
|
The simulation environment. |
required |
wip_target
|
|
Target shop WIP |
required |
loop_target
|
|
Target overlapping loop |
required |
focus_weights
|
|
FOCUS mechanism weights |
(0.25, 0.25, 0.25, 0.25, 0.0)
|
total_impact_weights
|
|
|
(0.25, 0.25, 0.5)
|
scenario
|
|
Environment description (shop type, machine count, arrival process, service-time, due-date rule). Defaults to a 6-machine pure job shop at rho=0.90. |
|
collect_workload
|
|
If True, attach a |
False
|
Returns:
| Type | Description |
|---|---|
|
|
|
Unpacks as |
Example
env = Environment() psp, servers, shop_floor, router, policy = build_draco_system( ... env=env, wip_target=8, loop_target=4 ... ) env.run(until=1000)
References
Kasper, A., Land, M., Teunter, R. (2023). Non-hierarchical work-in-progress control in manufacturing. International Journal of Production Economics, 257, 108768. https://doi.org/10.1016/j.ijpe.2022.108768
Scenario
Scenario
dataclass
Immutable description of a shop environment and its order stream.
Attributes:
| Name | Type | Description |
|---|---|---|
|
|
Factory producing the inter-arrival sampler. It is called
with the resolved arrival rate (orders per time unit, from
|
|
|
Explicit arrival rate (orders per time unit) that overrides
the mix-weighted derivation when not |
|
|
Target steady-state shop utilization ρ, a fraction in
|
|
|
The product mix (one or more |
|
|
Default due-date offset distribution applied to families
that do not set their own |
arrival_process
class-attribute
instance-attribute
arrival_process: Callable[[float], Callable[[], float]] = (
Exponential
)
arrival_rate
class-attribute
instance-attribute
arrival_rate: float | None = None
due_date_offset
class-attribute
instance-attribute
due_date_offset: Distribution = Uniform(low=30.0, high=45.0)
families
class-attribute
instance-attribute
families: tuple[SkuFamily, ...] = (SkuFamily(),)
n_servers
class-attribute
instance-attribute
n_servers: int = 6
shop_type
class-attribute
instance-attribute
shop_type: ShopType = PJS
target_utilization
class-attribute
instance-attribute
target_utilization: float = 0.9
__init__
__init__(
shop_type: ShopType = ShopType.PJS,
n_servers: int = 6,
target_utilization: float = 0.9,
families: tuple[SkuFamily, ...] = (SkuFamily(),),
due_date_offset: Distribution = Uniform(
low=30.0, high=45.0
),
arrival_process: Callable[
[float], Callable[[], float]
] = Exponential,
arrival_rate: float | None = None,
) -> None
__post_init__
__post_init__() -> None
build_floor
build_floor(
env: Environment,
*,
collect_workload: bool = False,
collect_time_series: bool = False,
retain_job_history: bool = False,
) -> tuple[ShopFloor, tuple[Server, ...]]
Create the ShopFloor and n_servers single-capacity servers.
build_router
build_router(
env: Environment,
shop_floor: ShopFloor,
servers: Sequence[Server],
*,
psp: PreShopPool | None,
priority_policies: Callable[..., float] | None = None,
) -> Router
Assemble the Router from the family mix: arrival process, per-family routing, service-time distributions, and due-date offsets/rules.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
env
|
|
The simulation environment. |
required |
shop_floor
|
|
The shop floor the router feeds jobs into. |
required |
servers
|
|
The server pool the routings and service-time maps are built
over. Its length must equal |
required |
psp
|
|
The pre-shop pool, or |
required |
priority_policies
|
|
Optional priority policy callable for the router. |
None
|
Raises:
| Type | Description |
|---|---|
|
If |
general_flow_shop
classmethod
general_flow_shop(**overrides: object) -> Scenario
General Flow Shop preset (directed/sorted routing).
pure_flow_shop
classmethod
pure_flow_shop(**overrides: object) -> Scenario
Pure Flow Shop preset (all machines, fixed order).
pure_job_shop
classmethod
pure_job_shop(**overrides: object) -> Scenario
Pure Job Shop preset (undirected routing).
resolved_arrival_rate
resolved_arrival_rate() -> float
The exponential arrival rate (explicit override, else mix-weighted derivation).
The returned rate (orders per time unit) is fed to arrival_process in
build_router, which expects a sampler of inter-arrival times with mean
1 / rate (the default Exponential satisfies this).
single
classmethod
single(
*,
service_time: Distribution | None = None,
due_date_offset: Distribution | None = None,
twk_allowance_factor: float | None = None,
name: str = "F1",
**shop: object,
) -> Scenario
Convenience for the common one-product case: build a single-family Scenario.
name is always forwarded (it has a default), while service_time,
due_date_offset, and twk_allowance_factor are forwarded only when
non-None, so SkuFamily's own defaults fill the rest. **shop
forwards shop-level kwargs (shop_type, n_servers,
target_utilization, arrival_rate, ...).
ShopType
Bases:
Standard workload-control benchmark shop types (by routing directedness).
Distributions
pure_job_shop_routing
pure_job_shop_routing(
servers: Sequence[Server],
) -> Callable[[], Sequence[Server]]
Create a Pure Job Shop (PJS) routing factory.
The Pure Job Shop — also called a randomly routed job shop (Conway et al.,
1967) — is the least directed of the standard workload-control benchmark
shops: each order has a random routing length and a random routing
direction (Oosterman, Land & Gaalman, 2000; Kasper, Land & Teunter, 2023).
The factory draws a routing length k uniformly from [1, len(servers)]
and returns k distinct servers in random order (sampling without
replacement, so re-entrant flow is prohibited).
See general_flow_shop_routing (same length rule, but the subset is sorted
into a directed flow) and pure_flow_shop_routing (fixed length, fully
directed) for the directed counterparts on the directedness spectrum.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
servers
|
|
The pool of servers to sample from. Its order defines the canonical machine index used by the directed sibling factories. |
required |
Returns:
| Type | Description |
|---|---|
|
A callable that, when invoked, returns a random subset of servers |
|
(between 1 and len(servers) inclusive) in random order, without |
|
replacement. |
Example
routing = pure_job_shop_routing(servers) routing() # Returns e.g., [server_2, server_5, server_1]
References
Oosterman, B., Land, M., & Gaalman, G. (2000). The influence of shop characteristics on workload control. International Journal of Production Economics, 68(1), 107-119. https://doi.org/10.1016/S0925-5273(99)00141-3
general_flow_shop_routing
general_flow_shop_routing(
servers: Sequence[Server],
) -> Callable[[], Sequence[Server]]
Create a General Flow Shop (GFS) routing factory.
The General Flow Shop is the directed counterpart of the Pure Job Shop:
each order has the same random routing length k ~ U[1, len(servers)] and
the same equal per-machine inclusion probability, but the selected machines
are sorted into ascending index order so that orders flow in a single
direction with typical upstream and downstream stations (Oosterman, Land &
Gaalman, 2000; Kasper, Land & Teunter, 2023). Machines are drawn without
replacement, so re-entrant flow is prohibited.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
servers
|
|
The pool of servers to sample from. Their order defines the canonical machine index along which routings are directed. |
required |
Returns:
| Type | Description |
|---|---|
|
A callable that, when invoked, returns a random subset of servers |
|
(between 1 and len(servers) inclusive), distinct and ordered by |
|
ascending canonical index. |
Example
routing = general_flow_shop_routing(servers) routing() # Returns e.g., [server_1, server_4, server_5]
References
Oosterman, B., Land, M., & Gaalman, G. (2000). The influence of shop characteristics on workload control. International Journal of Production Economics, 68(1), 107-119. https://doi.org/10.1016/S0925-5273(99)00141-3
pure_flow_shop_routing
pure_flow_shop_routing(
servers: Sequence[Server],
) -> Callable[[], Sequence[Server]]
Create a Pure Flow Shop (PFS) routing factory.
The Pure Flow Shop is the most directed benchmark shop: every order has a fixed routing length equal to the number of machines and visits all servers in the same fixed (directed) sequence (Oosterman, Land & Gaalman, 2000; Kasper, Land & Teunter, 2023). The routing is deterministic — every job shares the identical routing — so the factory ignores the RNG.
Note
Because every order visits every machine, the mean routing length is
len(servers) rather than (len(servers) + 1) / 2 as for the pure
job shop / general flow shop. Calibrate the arrival rate accordingly to
hit a target utilization (see arrival_rate_for_utilization).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
servers
|
|
The servers visited, in the fixed order they are visited. |
required |
Returns:
| Type | Description |
|---|---|
|
A callable that, when invoked, returns all servers in their given order. |
Example
routing = pure_flow_shop_routing(servers) routing() # Always returns every server, in order
References
Oosterman, B., Land, M., & Gaalman, G. (2000). The influence of shop characteristics on workload control. International Journal of Production Economics, 68(1), 107-119. https://doi.org/10.1016/S0925-5273(99)00141-3
Distribution
Bases:
A callable random variate that also reports its analytical mean.
Any object that is callable with no arguments returning a float and
exposes a mean property satisfies this protocol. The Router only
needs the callable side (the Sampler contract); Scenario reads
.mean to derive the arrival rate for a target utilization.
Exponential
dataclass
Exponential distribution with rate rate (mean 1/rate).
Erlang
dataclass
Erlang (Gamma with integer shape) distribution, mean shape/rate.
TruncatedErlang
dataclass
Erlang truncated to [0, max_value] by rejection resampling.
The default shape=2 reproduces the classic truncated 2-Erlang service
process. mean is the TRUE conditional mean E[X | X <= max_value],
strictly below the nominal shape/rate whenever max_value is finite.
LogNormal
dataclass
Lognormal distribution with underlying-normal params mu/sigma.
Uniform
dataclass
Continuous uniform distribution on [low, high], mean (low+high)/2.
Deterministic
dataclass
Degenerate distribution that always returns value.
arrival_rate_for_utilization
arrival_rate_for_utilization(
target_utilization: float,
*,
n_servers: int,
mean_routing_length: float,
mean_processing_time: float = 1.0,
) -> float
Derive the Poisson arrival rate that yields a target shop utilization.
Shop utilization couples to the mean routing length through
rho = lambda * E[L] * E[p] / M, where lambda is the order arrival
rate, E[L] the mean routing length, E[p] the mean operation
processing time, and M the number of machines. Inverting gives::
lambda = rho * M / (E[L] * E[p])
Because E[L] differs by shop type — M for a pure flow shop versus
(M + 1) / 2 for the pure job shop / general flow shop — the arrival rate
is not portable across shop types: a pure flow shop reusing a job-shop
arrival rate is driven unstable (rho > 1). Use this helper to recompute
the rate whenever the shop type, machine count, or processing-time mean
changes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
target_utilization
|
|
Desired steady-state utilization |
required |
n_servers
|
|
Number of machines |
required |
mean_routing_length
|
|
Mean number of operations per order |
required |
mean_processing_time
|
|
Mean operation processing time |
1.0
|
Returns:
| Type | Description |
|---|---|
|
The arrival rate |
|
the mean inter-arrival time for an exponential arrival process. |
Example
Pure flow shop, M = 6, rho = 0.90, E[p] = 1.0 -> mean IAT 1.111
round(1 / arrival_rate_for_utilization(0.9, n_servers=6, mean_routing_length=6), 3) 1.111
General flow shop / pure job shop, E[L] = 3.5 -> mean IAT 0.648
round(1 / arrival_rate_for_utilization(0.9, n_servers=6, mean_routing_length=3.5), 3) 0.648
twk_due_date
twk_due_date(
allowance_factor: float,
) -> Callable[[Sequence[float]], float]
Create a Total Work Content (TWK) due-date offset rule.
The TWK procedure sets an order's due date proportional to its total work
content: due_date = arrival_time + K * sum(p_ij), where K is the
allowance factor and sum(p_ij) is the order's total processing time
(Kasper, Land & Teunter, 2023). Larger, more work-intensive orders are
granted proportionally more lead time than the flat-allowance rule.
The returned callable matches the Router due_date_rule contract: it
receives the job's sampled operation processing times and returns the
due-date offset (the allowance added to the arrival time).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
allowance_factor
|
|
The constant |
required |
Returns:
| Type | Description |
|---|---|
|
A callable |
Example
rule = twk_due_date(8.74) # FOCUS pure-job-shop K (6 work centres) rule([1.0, 2.0]) # offset for a 3.0 work-content order 26.22
References
Kasper, A., Land, M., & Teunter, R. (2023). Towards system state dispatching in high-variety manufacturing. Omega, 114, 102726. https://doi.org/10.1016/j.omega.2022.102726
RunningStats
Compute running mean, variance, and standard deviation using Welford's algorithm.
Welford's algorithm maintains numerical stability by avoiding catastrophic cancellation that occurs when computing variance via the naive formula (sum of squares minus squared sum). This is especially important for simulations with many observations or values of similar magnitude.
Attributes:
| Name | Type | Description |
|---|---|---|
|
Number of observations added. |
|
|
Current running mean of all observations. |
|
|
Sum of squared differences from the mean (internal state). |
Example
stats = RunningStats() for value in [2.0, 4.0, 6.0]: ... stats.update(value) stats.mean 4.0 stats.std 2.0
M2
instance-attribute
M2 = 0.0
mean
instance-attribute
mean = 0.0
n
instance-attribute
n = 0
std
property
std: float
Sample standard deviation (square root of variance).
variance
property
variance: float
Sample variance using Bessel's correction (n-1 denominator).
__init__
__init__() -> None
Initialize statistics counters to zero.
update
update(x: float) -> None
Add a new observation and update running statistics.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
x
|
|
The new value to incorporate into the statistics. |
required |
z_norm
z_norm(x: float) -> float
Compute the z-score (standard score) for a given value.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
x
|
|
The value to normalize. |
required |
Returns:
| Type | Description |
|---|---|
|
The z-score (x - mean) / std, or 0.0 if insufficient data |
|
or zero standard deviation. |
Logging
SimLogger
Per-environment logger with structured output and history buffer.
Each Environment instance has its own SimLogger, which: - Automatically includes simulation time in log output - Supports JSON or text output format - Maintains an in-memory history buffer - Supports per-component filtering
__slots__
class-attribute
instance-attribute
__slots__ = (
"_env",
"_log_file",
"_log_format",
"_history",
"_component_filters",
"_handler_id",
"_env_id",
"_finalizer",
"_db_store",
)
_component_filters
instance-attribute
_component_filters: dict[str, bool] = {}
_db_store
instance-attribute
_db_store: SQLiteEventStore | None = (
SQLiteEventStore(db_path)
if db_path is not None
else None
)
_env
instance-attribute
_env = env
_env_id
instance-attribute
_env_id = hex
_finalizer
instance-attribute
_finalizer = finalize(
env, _finalize_logger, _handler_id, _db_store
)
_global_level
class-attribute
_global_level: str = 'INFO'
_handler_id
instance-attribute
_handler_id: int | None = None
_history
instance-attribute
_history = EventHistoryBuffer(max_size=history_size)
_log_file
instance-attribute
_log_file = Path(log_file) if log_file else None
_log_format
instance-attribute
_log_format = log_format
db_enabled
property
db_enabled: bool
Return whether SQLite storage is enabled.
env_id
property
env_id: str
Return this environment's unique identifier.
history
property
history: EventHistoryBuffer
Access the event history buffer.
__init__
__init__(
*,
env: Environment,
log_file: str | Path | None = None,
log_format: Literal["text", "json"] = "text",
history_size: int = 1000,
db_path: str | Path | None = None,
) -> None
Initialize a per-environment logger.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
env
|
|
The simulation environment this logger is attached to |
required |
log_file
|
|
Optional file path for log output (defaults to stderr) |
None
|
log_format
|
|
Output format ("text" or "json") |
'text'
|
history_size
|
|
Maximum number of events to keep in history buffer |
1000
|
db_path
|
|
Optional SQLite database path for persistent event storage. If provided, events are stored in both memory buffer and SQLite. |
None
|
_format_message
_format_message(record: dict[str, Any]) -> str
Format a log record for output.
_log
_log(
level: str,
message: str,
*,
component: str | None = None,
**extra: Any,
) -> None
Internal logging method.
_make_sink
_make_sink() -> Any
Create a custom sink that handles formatting.
_setup_handler
_setup_handler() -> None
Configure loguru handler for this environment.
_should_log
_should_log(component: str | None) -> bool
Check if logging is enabled for this component.
_should_log_level
_should_log_level(level: str) -> bool
Check if the given level should be logged based on global level.
close
close() -> None
Clean up loguru handler and SQLite connection.
Should be called when the environment is no longer needed, especially important for multiprocessing scenarios.
debug
debug(
message: str,
*,
component: str | None = None,
**extra: Any,
) -> None
Log a debug message.
disable_component
disable_component(component: str) -> None
Disable logging for a specific component type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
component
|
|
Component class name (e.g., "Server", "ProductionJob") |
required |
enable_component
enable_component(component: str) -> None
Enable logging for a specific component type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
component
|
|
Component class name (e.g., "Server", "ProductionJob") |
required |
error
error(
message: str,
*,
component: str | None = None,
**extra: Any,
) -> None
Log an error message.
execute_sql
execute_sql(
sql: str, params: tuple[Any, ...] = ()
) -> list[sqlite3.Row]
Execute arbitrary SQL query on the log database.
The database has a log_events table with columns:
id, env_id, timestamp, level, message, component, extra, wall_time
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sql
|
|
SQL query to execute. |
required |
params
|
|
Query parameters. |
()
|
Returns:
| Type | Description |
|---|---|
|
List of sqlite3.Row objects. |
Raises:
| Type | Description |
|---|---|
|
If SQLite storage is not enabled. |
get_level
classmethod
get_level() -> str
Get current global log level.
info
info(
message: str,
*,
component: str | None = None,
**extra: Any,
) -> None
Log an info message.
query_sql
query_sql(
*,
level: str | None = None,
component: str | None = None,
since: float | None = None,
until: float | None = None,
limit: int | None = None,
offset: int = 0,
) -> list[LogEvent]
Query events from SQLite storage.
Similar to history.query() but queries the persistent SQLite database. Only available when db_path was provided at initialization.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
level
|
|
Filter by log level (e.g., "INFO", "ERROR"). |
None
|
component
|
|
Filter by component name. |
None
|
since
|
|
Include events with timestamp >= since. |
None
|
until
|
|
Include events with timestamp <= until. |
None
|
limit
|
|
Maximum number of events to return. |
None
|
offset
|
|
Number of events to skip. |
0
|
Returns:
| Type | Description |
|---|---|
|
List of matching LogEvent objects. |
Raises:
| Type | Description |
|---|---|
|
If SQLite storage is not enabled. |
set_level
classmethod
set_level(level: str) -> None
Set global log level for all environments.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
level
|
|
Log level name (DEBUG, INFO, WARNING, ERROR, CRITICAL) |
required |
warning
warning(
message: str,
*,
component: str | None = None,
**extra: Any,
) -> None
Log a warning message.
LogEvent
dataclass
Immutable record of a logged event.
component
class-attribute
instance-attribute
component: str | None = None
extra
class-attribute
instance-attribute
extra: dict[str, Any] = field(default_factory=dict)
level
instance-attribute
level: str
message
instance-attribute
message: str
timestamp
instance-attribute
timestamp: float
__init__
__init__(
timestamp: float,
level: str,
message: str,
component: str | None = None,
extra: dict[str, Any] = dict(),
) -> None
EventHistoryBuffer
Fixed-size ring buffer for log events.
__slots__
class-attribute
instance-attribute
__slots__ = ('_buffer',)
_buffer
instance-attribute
_buffer: deque[LogEvent] = deque(maxlen=max_size)
__init__
__init__(max_size: int = 1000) -> None
__iter__
__iter__() -> Iterator[LogEvent]
__len__
__len__() -> int
append
append(event: LogEvent) -> None
Add an event to the buffer.
clear
clear() -> None
Clear all events from the buffer.
query
query(
*,
level: str | None = None,
component: str | None = None,
since: float | None = None,
until: float | None = None,
) -> list[LogEvent]
Query history with optional filters.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
level
|
|
Filter by log level (e.g., "INFO", "ERROR") |
None
|
component
|
|
Filter by component name (e.g., "Server") |
None
|
since
|
|
Include events with timestamp >= since |
None
|
until
|
|
Include events with timestamp <= until |
None
|
Returns:
| Type | Description |
|---|---|
|
List of matching LogEvent objects |
SQLiteEventStore
Thread-safe SQLite storage for log events.
Provides persistent storage of log events across simulation runs. Multiple environments can share a single database file, distinguished by the env_id column.
_SCHEMA
class-attribute
instance-attribute
_SCHEMA = "\n CREATE TABLE IF NOT EXISTS log_events (\n id INTEGER PRIMARY KEY AUTOINCREMENT,\n env_id TEXT NOT NULL,\n timestamp REAL NOT NULL,\n level TEXT NOT NULL,\n message TEXT NOT NULL,\n component TEXT,\n extra TEXT,\n wall_time TEXT NOT NULL\n );\n CREATE INDEX IF NOT EXISTS idx_env_id ON log_events(env_id);\n CREATE INDEX IF NOT EXISTS idx_timestamp ON log_events(env_id, timestamp);\n CREATE INDEX IF NOT EXISTS idx_level ON log_events(env_id, level);\n CREATE INDEX IF NOT EXISTS idx_component ON log_events(env_id, component);\n "
__slots__
class-attribute
instance-attribute
__slots__ = ('_db_path', '_conn', '_lock')
_conn
instance-attribute
_conn: Connection = connect(
str(_db_path), check_same_thread=False
)
_db_path
instance-attribute
_db_path: Path = Path(db_path)
_lock
instance-attribute
_lock: Lock = Lock()
__init__
__init__(db_path: str | Path) -> None
Open or create the SQLite database.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
db_path
|
|
Path to the SQLite database file. |
required |
_init_schema
_init_schema() -> None
Create tables and indexes if they don't exist.
close
close() -> None
Close the database connection.
execute_sql
execute_sql(
sql: str, params: tuple[Any, ...] = ()
) -> list[sqlite3.Row]
Execute arbitrary SQL and return raw rows.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sql
|
|
SQL query to execute. |
required |
params
|
|
Query parameters. |
()
|
Returns:
| Type | Description |
|---|---|
|
List of sqlite3.Row objects. |
insert
insert(
env_id: str, event: LogEvent, wall_time: str
) -> None
Insert a single event.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
env_id
|
|
Environment identifier. |
required |
event
|
|
The LogEvent to store. |
required |
wall_time
|
|
ISO8601 wall-clock time string. |
required |
query
query(
env_id: str,
*,
level: str | None = None,
component: str | None = None,
since: float | None = None,
until: float | None = None,
limit: int | None = None,
offset: int = 0,
) -> list[LogEvent]
Query events with optional filters.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
env_id
|
|
Environment identifier to filter by. |
required |
level
|
|
Filter by log level (e.g., "INFO", "ERROR"). |
None
|
component
|
|
Filter by component name. |
None
|
since
|
|
Include events with timestamp >= since. |
None
|
until
|
|
Include events with timestamp <= until. |
None
|
limit
|
|
Maximum number of events to return. |
None
|
offset
|
|
Number of events to skip. |
0
|
Returns:
| Type | Description |
|---|---|
|
List of matching LogEvent objects. |