Skip to content

Core API

The core of Simulatte is a small set of cooperating objects: Environment drives the SimPy clock and carries the logger; ShopFloor is the central orchestrator that tracks WIP, routes jobs, and fires hooks; ProductionJob represents a unit of work that moves through a sequence of Server resources via the Router; PreShopPool holds jobs before they are released to the floor; and Runner repeats multiple simulation replications with independent random seeds. See the architecture diagram for how these objects interact.

Core objects

Environment

Bases: Environment

Thin wrapper around simpy.Environment with integrated logging.

Each environment has its own logger that: - Automatically includes simulation time in log output - Supports JSON or text output format - Maintains an in-memory history buffer - Supports per-component filtering

_logger instance-attribute

_logger = SimLogger(
    env=self,
    log_file=log_file,
    log_format=log_format,
    history_size=log_history_size,
    db_path=log_db_path,
)

log_history property

log_history: EventHistoryBuffer

Access the event history buffer.

Returns:

Type Description
EventHistoryBuffer

The EventHistoryBuffer containing recent log events.

EventHistoryBuffer

Use .query() to filter events by level, component, or time range.

Example

env.log_history.query(level="ERROR", since=100.0)

logger property

logger: SimLogger

Access the underlying SimLogger for advanced configuration.

Use this to enable/disable component-level filtering: >>> env.logger.disable_component("Server") >>> env.logger.enable_component("ShopFloor")

__enter__

__enter__() -> Environment

__exit__

__exit__(exc_type: object, exc: object, tb: object) -> None

__init__

__init__(
    *,
    log_file: str | Path | None = None,
    log_format: Literal["text", "json"] = "text",
    log_history_size: int = 1000,
    log_db_path: str | Path | None = None,
) -> None

Initialize the simulation environment.

Parameters:

Name Type Description Default
log_file str | Path | None

Optional file path for log output (defaults to stderr)

None
log_format Literal['text', 'json']

Output format ("text" or "json")

'text'
log_history_size int

Maximum number of events to keep in history buffer

1000
log_db_path str | Path | None

Optional SQLite database path for persistent event storage. If provided, events are stored in both memory buffer and SQLite.

None

close

close() -> None

Release logger resources associated with this environment.

debug

debug(
    message: str,
    *,
    component: str | None = None,
    **extra: Any,
) -> None

Log a debug message with simulation time context.

Parameters:

Name Type Description Default
message str

The log message

required
component str | None

Optional component class name for filtering (e.g., "Server")

None
**extra Any

Additional structured data to include in the log

{}

error

error(
    message: str,
    *,
    component: str | None = None,
    **extra: Any,
) -> None

Log an error message with simulation time context.

Parameters:

Name Type Description Default
message str

The log message

required
component str | None

Optional component class name for filtering (e.g., "Server")

None
**extra Any

Additional structured data to include in the log

{}

info

info(
    message: str,
    *,
    component: str | None = None,
    **extra: Any,
) -> None

Log an info message with simulation time context.

Parameters:

Name Type Description Default
message str

The log message

required
component str | None

Optional component class name for filtering (e.g., "Server")

None
**extra Any

Additional structured data to include in the log

{}

step

step() -> None

Process the next event in the queue.

If user interrupts the simulation via KeyboardInterrupt raise a StopSimulation exception to gently pause the simulation.

warning

warning(
    message: str,
    *,
    component: str | None = None,
    **extra: Any,
) -> None

Log a warning message with simulation time context.

Parameters:

Name Type Description Default
message str

The log message

required
component str | None

Optional component class name for filtering (e.g., "Server")

None
**extra Any

Additional structured data to include in the log

{}

ShopFloor

Central orchestrator for job flow through a manufacturing simulation.

The ShopFloor manages the complete lifecycle of production jobs as they move through a sequence of servers. It tracks work-in-progress (WIP) at each server, maintains metrics for performance monitoring, and signals events when jobs complete processing steps or finish entirely.

Extensibility is provided through composition: - on_before_operation / on_after_operation: Hooks for custom logic at each operation - wip_strategy: Pluggable WIP calculation - metrics_collector: Pluggable metrics recording - time_series_collector: Pluggable time-series data collection - on_job_finished: Callbacks when jobs complete - material_coordinator: Optional material delivery coordination

Attributes:

Name Type Description
env

The simulation environment providing time and process management.

material_coordinator

Optional coordinator for material delivery.

servers list[Server]

List of servers registered with this shop floor.

jobs set[ProductionJob]

Set of jobs currently being processed on the shop floor.

jobs_done list[ProductionJob]

List of completed jobs in order of completion.

wip dict[Server, float]

Dictionary mapping each server to its current WIP value.

total_time_in_system float

Cumulative time spent by all completed jobs.

job_processing_end

SimPy event triggered when any job finishes processing at a server. Recreated after each trigger.

job_finished_event

SimPy event triggered when any job completes its entire routing. Recreated after each trigger.

maximum_wip_value float

Peak total WIP observed during simulation.

maximum_shopfloor_jobs int

Peak number of concurrent jobs observed.

Example

Basic usage with hooks::

from simulatte import Environment, Server, ProductionJob, ShopFloor

def setup_hook(job, server, op_index, pt):
    yield server.env.timeout(1.0)  # 1s setup time

env = Environment()
shop_floor = ShopFloor(env=env, on_before_operation=setup_hook)
server = Server(env=env, capacity=1, shopfloor=shop_floor)

job = ProductionJob(
    env=env, sku="PART-A", servers=[server],
    processing_times=[10.0], due_date=100.0,
)

shop_floor.add(job)
env.run()

_after_operation instance-attribute

_after_operation: list[OperationHook] = _normalize_hooks(
    on_after_operation
)

_before_operation instance-attribute

_before_operation: list[OperationHook] = _normalize_hooks(
    on_before_operation
)

_metrics_collector instance-attribute

_metrics_collector: MetricsCollector | None = (
    EMAMetricsCollector(alpha=ema_alpha)
)

_on_job_finished instance-attribute

_on_job_finished: list[Callable[[ProductionJob], None]] = (
    _normalize_callbacks(on_job_finished)
)

_processing_end_callbacks instance-attribute

_processing_end_callbacks: list[
    Callable[[ProductionJob, Server], None]
] = []

_time_series_collector instance-attribute

_time_series_collector: TimeSeriesCollector | None = (
    time_series_collector
)

_wip_strategy instance-attribute

_wip_strategy: WIPStrategy = (
    wip_strategy
    if wip_strategy is not None
    else StandardWIPStrategy()
)

average_time_in_system property

average_time_in_system: float

Average time jobs spend in the system from first server entry to completion.

Calculated as total_time_in_system divided by the number of completed jobs. Returns 0.0 if no jobs have completed yet.

Returns:

Type Description
float

Average time in system for all completed jobs, or 0.0 if none completed.

env instance-attribute

env = env

job_finished_event instance-attribute

job_finished_event = event()

job_processing_end instance-attribute

job_processing_end = event()

jobs instance-attribute

jobs: set[ProductionJob] = set()

jobs_done instance-attribute

jobs_done: list[ProductionJob] = []

material_coordinator instance-attribute

material_coordinator = material_coordinator

maximum_shopfloor_jobs instance-attribute

maximum_shopfloor_jobs: int = 0

maximum_wip_value instance-attribute

maximum_wip_value: float = 0.0

metrics_collector property

metrics_collector: MetricsCollector | None

Collector called when jobs complete (or None if disabled).

servers instance-attribute

servers: list[Server] = []

time_series_collector property

time_series_collector: TimeSeriesCollector | None

Collector for time-series data (or None if disabled).

total_time_in_system instance-attribute

total_time_in_system: float = 0.0

wip instance-attribute

wip: dict[Server, float] = {}

wip_strategy property

wip_strategy: WIPStrategy

The current WIP strategy used by the shopfloor.

__init__

__init__(
    *,
    env: Environment,
    ema_alpha: float = 0.01,
    material_coordinator: MaterialCoordinator | None = None,
    wip_strategy: WIPStrategy | None = None,
    metrics_collector: MetricsCollector
    | None
    | object = _DEFAULT_METRICS_COLLECTOR,
    collect_time_series: bool = False,
    time_series_collector: TimeSeriesCollector
    | None = None,
    on_before_operation: OperationHook
    | Sequence[OperationHook]
    | None = None,
    on_after_operation: OperationHook
    | Sequence[OperationHook]
    | None = None,
    on_job_finished: Callable[[ProductionJob], None]
    | Sequence[Callable[[ProductionJob], None]]
    | None = None,
) -> None

Initialize a new ShopFloor instance.

Parameters:

Name Type Description Default
env Environment

The simulation environment that provides time management, event scheduling, and process coordination.

required
ema_alpha float

Smoothing factor for the default EMAMetricsCollector. Must be in range (0, 1]. Ignored if a custom metrics_collector is provided. Defaults to 0.01.

0.01
material_coordinator MaterialCoordinator | None

Optional coordinator for handling material delivery to servers. When provided, the shop floor will ensure materials are delivered before processing begins at each operation, implementing FIFO blocking behavior.

None
wip_strategy WIPStrategy | None

Strategy for WIP calculation. Defaults to StandardWIPStrategy which uses full processing times.

None
metrics_collector MetricsCollector | None | object

Collector for job completion metrics. Defaults to EMAMetricsCollector. Pass None to disable metrics.

_DEFAULT_METRICS_COLLECTOR
collect_time_series bool

If True and time_series_collector is None, creates a DefaultTimeSeriesCollector for WIP, job count, throughput, and lateness tracking. Defaults to False.

False
time_series_collector TimeSeriesCollector | None

Collector for time-series data. If provided, overrides collect_time_series. Pass None to disable time-series collection. Defaults to None.

None
on_before_operation OperationHook | Sequence[OperationHook] | None

Hook(s) called after acquiring server but before material delivery and processing. Can be a single hook or list.

None
on_after_operation OperationHook | Sequence[OperationHook] | None

Hook(s) called after processing completes but before signaling. Can be a single hook or list.

None
on_job_finished Callable[[ProductionJob], None] | Sequence[Callable[[ProductionJob], None]] | None

Callback(s) called when a job completes its entire routing. Can be a single callable or list.

None

_fire_processing_end_callbacks

_fire_processing_end_callbacks(
    job: ProductionJob, server: Server
) -> None

Invoke on_processing_end callbacks after server release.

Called after the with server.request() context exits, meaning servers_exit_at is stamped and the server resource is freed. This is the correct point for release-policy decisions that inspect server state or job.previous_server.

Parameters:

Name Type Description Default
job ProductionJob

The job that just completed processing.

required
server Server

The server where processing completed.

required

_normalize_callbacks staticmethod

_normalize_callbacks(
    callbacks: Callable[[ProductionJob], None]
    | Sequence[Callable[[ProductionJob], None]]
    | None,
) -> list[Callable[[ProductionJob], None]]

Normalize callback parameter to a list.

_normalize_hooks staticmethod

_normalize_hooks(
    hooks: OperationHook | Sequence[OperationHook] | None,
) -> list[OperationHook]

Normalize hook parameter to a list.

add

add(job: ProductionJob) -> None

Release a job from the Pre-Shop Pool onto the shop floor.

This method performs the following actions: 1. Adds the job to the active jobs set 2. Updates WIP values via the configured WIP strategy 3. Records the PSP exit timestamp on the job 4. Spawns the main processing coroutine for the job

Parameters:

Name Type Description Default
job ProductionJob

The production job to release onto the shop floor. The job's routing must contain valid server references.

required
Note

This method modifies the job's psp_exit_at timestamp and spawns an async process. The job will begin queuing at its first server immediately after this call.

attach_dispatcher

attach_dispatcher(
    dispatcher: object, *, psp: PreShopPool | None = None
) -> None

Wire a dispatcher object's hook methods to this shopfloor.

Detects which hook methods exist on the dispatcher and registers only those that are callable. This allows partial implementations where a dispatcher only handles a subset of events.

Parameters:

Name Type Description Default
dispatcher object

Object with any combination of on_before_operation, on_after_operation, on_job_finished, on_processing_end, and on_psp_arrival methods.

required
psp PreShopPool | None

If provided and dispatcher has on_psp_arrival, registers an arrival subscription on the PSP.

None

main

main(job: ProductionJob) -> ProcessGenerator

Execute the main processing loop for a job through all its servers.

This generator manages the complete lifecycle of a job as it moves through its routing. For each server in the job's routing, it:

  1. Requests and acquires the server resource (queuing if busy)
  2. Executes on_before_operation hooks
  3. If a MaterialCoordinator is configured, waits for material delivery
  4. Processes the job for the specified duration
  5. Updates WIP via the configured WIP strategy
  6. Executes on_after_operation hooks
  7. Signals processing completion (job_processing_end event + on_processing_end callbacks)

After all operations complete, it: - Records the finish timestamp on the job - Moves the job from active (jobs) to completed (jobs_done) - Records metrics via the configured metrics collector - Calls on_job_finished callbacks - Signals job completion via signal_job_finished()

Parameters:

Name Type Description Default
job ProductionJob

The production job to process through its routing.

required

Yields:

Type Description
ProcessGenerator

SimPy events for server requests, hooks, material delivery, and processing.

Note

This method is automatically spawned by add() and should not be called directly. It runs as a SimPy process until the job completes.

on_after_operation

on_after_operation(hook: OperationHook) -> None

Register a hook to run after each operation.

Hooks registered post-construction execute after any hooks passed via init, in registration order.

on_before_operation

on_before_operation(hook: OperationHook) -> None

Register a hook to run before each operation.

Hooks registered post-construction execute after any hooks passed via init, in registration order.

on_job_finished

on_job_finished(
    callback: Callable[[ProductionJob], None],
) -> None

Register a callback for when a job completes its entire routing.

Callbacks registered post-construction execute after any callbacks passed via init, in registration order.

on_processing_end

on_processing_end(
    callback: Callable[[ProductionJob, Server], None],
) -> None

Register a callback for when a job completes processing at any server.

Callbacks are invoked synchronously after the server is released (servers_exit_at is stamped and job.previous_server is available). This fires after each operation, not just when the job finishes its entire routing.

Note

The SimPy job_processing_end event is succeeded earlier (while the server is still held). These callbacks fire after server release and always run before SimPy process-based listeners resume.

Parameters:

Name Type Description Default
callback Callable[[ProductionJob, Server], None]

Function called with (job, server) after processing completes.

required

set_metrics_collector

set_metrics_collector(
    collector: MetricsCollector | None,
) -> None

Replace the shopfloor's metrics collector (or disable with None).

set_time_series_collector

set_time_series_collector(
    collector: TimeSeriesCollector | None,
) -> None

Replace the shopfloor's time-series collector (or disable with None).

set_wip_strategy

set_wip_strategy(strategy: WIPStrategy) -> None

Replace the shopfloor's WIP strategy.

signal_job_finished

signal_job_finished(job: ProductionJob) -> None

Signal that a job has completed its entire routing.

This method triggers the job_finished_event with the completed job as the event value, notifying any waiting processes that a job has finished all operations. The event is then recreated for the next signal.

Unlike job_processing_end, this is only called once per job when it completes its final operation.

Parameters:

Name Type Description Default
job ProductionJob

The job that just completed its entire routing.

required
Example

Counting completed jobs::

completed = 0
while completed < target:
    job = yield shop_floor.job_finished_event
    completed += 1
    print(f"Job {job.id} completed. Total: {completed}")

Server

Bases: PriorityResource

A server/workstation for job-shop simulation with queue and utilization tracking.

Server extends SimPy's PriorityResource to process jobs with priority-based queueing. It tracks queue lengths, utilization rates, and optionally records time-series data for visualization. When attached to a ShopFloor, the server is automatically registered and assigned an index for identification.

Dynamic priorities: queued jobs' priorities are refreshed before every dispatch decision. sort_queue re-evaluates each queued request's job.priority_policy and rewrites req.key; _trigger_put (the SimPy hook invoked on both new-arrival and release paths) calls sort_queue before delegating to SimPy. Callers may also invoke sort_queue explicitly to observe the resulting order between events. The cost per dispatch decision is one priority_policy call per queued request; policies must be deterministic given (job, server) and the current simulation state at call time.

_idx instance-attribute

_idx = index(self)

_jobs instance-attribute

_jobs: list[BaseJob] | None = (
    [] if retain_job_history else None
)

_last_queue_level instance-attribute

_last_queue_level: int = 0

_last_queue_level_timestamp instance-attribute

_last_queue_level_timestamp: float = 0

_qt instance-attribute

_qt: list[tuple[float, int]] | None = (
    [] if collect_time_series else None
)

_queue_history instance-attribute

_queue_history: dict[int, float] = defaultdict(float)

_ut instance-attribute

_ut: list[tuple[float, float]] | None = (
    [(0, 0.0)] if collect_time_series else None
)

average_queue_length property

average_queue_length: float

Time-weighted average queue length over the simulation.

current_jobs property

current_jobs: tuple[BaseJob, ...]

Jobs currently occupying active server slots (includes hook/material phases).

empty property

empty: bool

Whether the queue is empty.

env instance-attribute

env = env

idle_time property

idle_time: float

Total time the server has been idle.

is_idle property

is_idle: bool

Whether the server has no active users and an empty queue.

queueing_jobs property

queueing_jobs: Iterable[BaseJob]

Iterator over jobs currently waiting in the queue.

utilization_rate property

utilization_rate: float

Fraction of time the server has been busy (0 to 1).

worked_time instance-attribute

worked_time: float = 0

__init__

__init__(
    *,
    env: Environment,
    capacity: int,
    shopfloor: ShopFloor | None = None,
    collect_time_series: bool = False,
    retain_job_history: bool = False,
) -> None

Initialize a server resource.

Parameters:

Name Type Description Default
env Environment

The simulation environment.

required
capacity int

Maximum number of jobs that can be processed simultaneously.

required
shopfloor ShopFloor | None

Optional ShopFloor for automatic registration. If provided, the server is added to the shopfloor's server list.

None
collect_time_series bool

If True, record queue length and utilization over time for later visualization via plot_qt() and plot_ut().

False
retain_job_history bool

If True, maintain a list of all processed jobs.

False

__repr__

__repr__() -> str

_trigger_put

_trigger_put(get_event: Release | None) -> None

Refresh queue priorities before SimPy iterates the put queue.

Overrides simpy.resources.base.BaseResource._trigger_put to call sort_queue (which re-evaluates job.priority_policy for every queued request and rewrites req.key) before delegating to SimPy. SimPy invokes _trigger_put from two call sites: simpy.resources.base.Put.__init__ (after a new arrival is appended to put_queue) and as a callback on every Release event (simpy.resources.base.Get.__init__ registers it). Refreshing here therefore covers both the new-arrival and release dispatch paths.

_update_qt

_update_qt() -> None

Record current queue length to the time-series if collection is enabled.

_update_queue_history

_update_queue_history(_: Event | None) -> None

Update queue histogram and trigger time-series updates.

_update_ut

_update_ut() -> None

Record current utilization to the time-series if collection is enabled.

plot_qt

plot_qt() -> None

Display a step plot of queue length over simulation time.

Raises:

Type Description
RuntimeError

If time-series collection was not enabled at initialization.

plot_ut

plot_ut() -> None

Display a step plot of utilization rate over simulation time.

Raises:

Type Description
RuntimeError

If time-series collection was not enabled at initialization.

process_job

process_job(
    job: BaseJob, processing_time: float
) -> ProcessGenerator

Simulate processing a job for a given duration.

This generator yields a timeout event for the processing duration and updates worked_time. Should be called within a request context.

Parameters:

Name Type Description Default
job BaseJob

The job being processed.

required
processing_time float

Duration of processing in simulation time units.

required

Yields:

Type Description
ProcessGenerator

A SimPy timeout event for the processing duration.

release

release(request: ServerPriorityRequest) -> Release

Release the server after job processing.

Records the job's exit time and updates utilization tracking.

Parameters:

Name Type Description Default
request ServerPriorityRequest

The ServerPriorityRequest to release.

required

Returns:

Type Description
Release

A SimPy Release event.

request

request(
    *, job: BaseJob, preempt: bool = True
) -> ServerPriorityRequest

Request server access for a job with priority-based queueing.

Creates a priority request that enters the server queue. The request should be used as a context manager to ensure proper release.

Parameters:

Name Type Description Default
job BaseJob

The job requesting server access.

required
preempt bool

If True, this request can preempt lower-priority jobs.

True

Returns:

Type Description
ServerPriorityRequest

A ServerPriorityRequest to be yielded and used as a context manager.

sort_queue

sort_queue() -> None

Refresh queued requests' priority keys and resort.

For each request in the queue, calls req.job.priority(req.server) to obtain the current priority and rewrites req.key accordingly, then sorts the queue in ascending order by the refreshed keys.

Called automatically before every dispatch decision via _trigger_put. May also be invoked explicitly by user code that has mutated priority_policy and wants to observe the new order before the next dispatch event.

Note: req.priority is not refreshed; it remains the snapshot taken at request construction. To inspect a queued job's current priority, call req.job.priority(req.server) directly.

Requires that every queued request expose job, server, time, and preempt attributes (which ServerPriorityRequest does).

ProductionJob

Bases: BaseJob

A production job that flows through servers with optional material requirements.

Production jobs represent manufacturing orders that require processing at one or more servers. They can optionally specify material requirements that must be delivered before processing can begin at each operation.

__slots__ class-attribute instance-attribute

__slots__ = ('material_requirements',)

material_requirements instance-attribute

material_requirements = material_requirements or {}

__init__

__init__(
    *,
    env: Environment,
    sku: str,
    servers: Sequence[Server],
    processing_times: Sequence[float],
    due_date: SimTime,
    priority_policy: Callable[[Any, Server], float]
    | None = None,
    material_requirements: dict[int, dict[str, int]]
    | None = None,
) -> None

Initialize a production job.

Parameters:

Name Type Description Default
env Environment

The simulation environment.

required
sku str

Job SKU identifier.

required
servers Sequence[Server]

Sequence of servers in the job's routing.

required
processing_times Sequence[float]

Processing time at each server.

required
due_date SimTime

Target completion time.

required
priority_policy Callable[[Any, Server], float] | None

Optional function to compute priority at each server.

None
material_requirements dict[int, dict[str, int]] | None

Optional mapping from operation index to required materials. Format: {op_index: {product_name: quantity}}. Example: {0: {"steel": 2, "bolts": 10}} means operation 0 requires 2 units of steel and 10 bolts to be delivered before processing.

None

__repr__

__repr__() -> str

get_materials_for_operation

get_materials_for_operation(
    op_index: int,
) -> dict[str, int]

Get material requirements for a specific operation.

Parameters:

Name Type Description Default
op_index int

The operation index (0-based).

required

Returns:

Type Description
dict[str, int]

Dictionary mapping product names to required quantities,

dict[str, int]

or empty dict if no materials required.

Router

Stochastic job generator that routes jobs through the simulation.

The Router continuously generates ProductionJob instances at random intervals determined by the inter-arrival distribution. Each job is assigned a randomly selected SKU, a routing through servers, and processing times sampled from configured distributions.

Jobs are routed based on system configuration: - Push system (psp=None): Jobs go directly to the ShopFloor - Pull system (psp set): Jobs queue in the PreShopPool until released

Upon instantiation, the Router registers itself as a SimPy process that runs for the duration of the simulation.

due_date_offset_distribution instance-attribute

due_date_offset_distribution = due_date_offset_distribution

due_date_rule instance-attribute

due_date_rule = due_date_rule

env instance-attribute

env = env

inter_arrival_distribution instance-attribute

inter_arrival_distribution = inter_arrival_distribution

priority_policies instance-attribute

priority_policies = priority_policies

psp instance-attribute

psp = psp

servers instance-attribute

servers = servers

shopfloor instance-attribute

shopfloor = shopfloor

sku_distributions instance-attribute

sku_distributions = sku_distributions

sku_routings instance-attribute

sku_routings = sku_routings

sku_service_times instance-attribute

sku_service_times = sku_service_times

__init__

__init__(
    *,
    env: Environment,
    shopfloor: ShopFloor,
    servers: Sequence[Server],
    psp: PreShopPool | None,
    inter_arrival_distribution: Sampler[float],
    sku_distributions: DiscreteDistribution[str, float],
    sku_routings: dict[str, Callable[[], Sequence[Server]]],
    sku_service_times: dict[
        str, DiscreteDistribution[Server, Sampler[float]]
    ],
    due_date_offset_distribution: dict[str, Sampler[float]],
    priority_policies: Callable[
        [ProductionJob, Server], float
    ]
    | None = None,
    due_date_rule: dict[
        str, Callable[[Sequence[float]], float]
    ]
    | None = None,
) -> None

Initialize the Router and start the job generation process.

Parameters:

Name Type Description Default
env Environment

The simulation environment.

required
shopfloor ShopFloor

ShopFloor instance managing job flow and WIP tracking.

required
servers Sequence[Server]

Sequence of all available Server instances in the system.

required
psp PreShopPool | None

PreShopPool for pull systems, or None for push systems where jobs go directly to the ShopFloor.

required
inter_arrival_distribution Sampler[float]

Callable returning the time until the next job arrival (e.g., lambda: random.expovariate(1.0)).

required
sku_distributions DiscreteDistribution[str, float]

Mapping from SKU names to probability weights for random SKU selection (e.g., {"A": 0.5, "B": 0.3, "C": 0.2}).

required
sku_routings dict[str, Callable[[], Sequence[Server]]]

Mapping from SKU to a callable that returns the server routing sequence for that SKU.

required
sku_service_times dict[str, DiscreteDistribution[Server, Sampler[float]]]

Nested mapping {sku: {server: distribution}} where each distribution is a callable returning the processing time.

required
due_date_offset_distribution dict[str, Sampler[float]]

Mapping from SKU to a callable returning the offset used to compute due date (due_date = now + offset).

required
priority_policies Callable[[ProductionJob, Server], float] | None

Optional callable (job, server) -> float for computing job priority at each server.

None
due_date_rule dict[str, Callable[[Sequence[float]], float]] | None

Optional per-SKU mapping to a callable (processing_times) -> float that derives the due-date offset from the job's sampled operation processing times (its total work content). When a SKU has an entry here it takes precedence over due_date_offset_distribution for that SKU; otherwise the flat offset is used. This enables work-content due-date rules such as Total Work Content (TWK): due_date = now + K * sum(p_ij).

None
Example

router = Router( ... env=env, ... shopfloor=shop_floor, ... servers=servers, ... psp=None, # Push system ... inter_arrival_distribution=lambda: random.expovariate(1.0), ... sku_distributions={"F1": 1.0}, ... sku_routings={"F1": lambda: servers}, ... sku_service_times={"F1": {s: lambda: 2.0 for s in servers}}, ... due_date_offset_distribution={"F1": lambda: 30.0}, ... )

generate_job

generate_job() -> Generator[Timeout, None, NoReturn]

Infinite generator that creates and routes jobs at random intervals.

This method runs as a SimPy process for the simulation's duration. On each iteration it:

  1. Waits for the inter-arrival time
  2. Samples a random SKU based on configured weights
  3. Generates a routing and processing times for the selected SKU
  4. Creates a ProductionJob with computed due date
  5. Routes the job to PSP (if configured) or directly to ShopFloor

Yields:

Type Description
Timeout

simpy.Timeout: Pauses the process until the next job arrival.

PreShopPool

Buffer queue for jobs awaiting shopfloor release.

A pure container with no built-in release logic. Release policies are implemented as external SimPy processes using the trigger functions from simulatte.policies.triggers.

The pool provides a new_job event that external processes can monitor to react immediately when jobs arrive (e.g., for starvation avoidance).

Example

from simulatte.policies.triggers import periodic_trigger, on_arrival_trigger psp = PreShopPool(env=env, shopfloor=shopfloor) env.process(periodic_trigger(psp, 1.0, my_release_fn)) env.process(on_arrival_trigger(psp, my_on_arrival_fn))

_arrival_callbacks instance-attribute

_arrival_callbacks: list[
    Callable[[ProductionJob, PreShopPool], None]
] = []

_psp instance-attribute

_psp: deque[ProductionJob] = deque()

empty property

empty: bool

Whether the pool contains no jobs.

env instance-attribute

env = env

jobs property

jobs: Iterable[ProductionJob]

Iterate over jobs in the pool in FIFO order (oldest first).

new_job instance-attribute

new_job = event()

shopfloor instance-attribute

shopfloor = shopfloor

__contains__

__contains__(job: ProductionJob) -> bool

Check if a job is currently in the pool.

__getitem__

__getitem__(index: int) -> ProductionJob

Get a job by its position in the queue (0 = oldest).

__init__

__init__(*, env: Environment, shopfloor: ShopFloor) -> None

Initialize the pre-shop pool.

Parameters:

Name Type Description Default
env Environment

The simulation environment.

required
shopfloor ShopFloor

The shopfloor that will receive released jobs.

required

__len__

__len__() -> int

Return the number of jobs currently in the pool.

_signal_new_job

_signal_new_job(job: ProductionJob) -> None

Invoke arrival callbacks and trigger the new_job event.

First invokes all registered on_arrival callbacks synchronously, then succeeds the SimPy new_job event (waking process-based listeners).

Parameters:

Name Type Description Default
job ProductionJob

The job to pass to callbacks and as the event's value.

required

add

add(job: ProductionJob) -> None

Add a job to the pool and signal its arrival.

Appends the job to the end of the queue and triggers the new_job event, allowing event-driven processes (e.g., starvation avoidance) to react immediately to the new arrival.

Parameters:

Name Type Description Default
job ProductionJob

The production job to add to the pool.

required

jobs_starting_at

jobs_starting_at(server: Server) -> list[ProductionJob]

Return jobs in the pool whose routing begins at the given server.

Parameters:

Name Type Description Default
server Server

The server to filter by.

required

Returns:

Type Description
list[ProductionJob]

List of jobs whose first routing server matches, in FIFO order.

on_arrival

on_arrival(
    callback: Callable[[ProductionJob, PreShopPool], None],
) -> None

Subscribe a callback to be invoked each time a job arrives in the pool.

Callbacks are invoked synchronously during add(), before the SimPy new_job event fires. No env.run() priming is needed.

Parameters:

Name Type Description Default
callback Callable[[ProductionJob, PreShopPool], None]

Function called with (job, psp) when a job arrives.

required

release

release(job: ProductionJob) -> None

Remove a job from the pool and release it to the shopfloor.

Convenience method combining remove() and shopfloor.add(). Use remove() instead if you want to discard a job without releasing it.

Parameters:

Name Type Description Default
job ProductionJob

The job to release from the pool to the shopfloor.

required

Raises:

Type Description
ValueError

If the job is not found in the pool.

remove

remove(
    *, job: ProductionJob | None = None
) -> ProductionJob

Remove a job from the pool and record its exit timestamp.

Supports two modes: FIFO removal (default) or specific job removal. Sets job.psp_exit_at to the current simulation time before returning.

Parameters:

Name Type Description Default
job ProductionJob | None

The specific job to remove. If None, removes the oldest job (FIFO).

None

Returns:

Type Description
ProductionJob

The removed job with its psp_exit_at timestamp updated.

Raises:

Type Description
ValueError

If a specific job is requested but not found in the pool.

Runner

Bases: Generic[S, T]

Manage repeated simulations with configurable builder and seeds.

The builder callable should accept an env: Environment parameter.

Supports per-simulation log files when running in parallel, enabling separate log output for each simulation instance.

builder instance-attribute

builder = builder

extract_fn instance-attribute

extract_fn = extract_fn

log_dir instance-attribute

log_dir = log_dir

log_format instance-attribute

log_format: Literal['text', 'json'] = log_format

n_jobs instance-attribute

n_jobs = n_jobs

parallel instance-attribute

parallel = parallel

progress instance-attribute

progress = progress

seeds instance-attribute

seeds = seeds

__init__

__init__(
    *,
    builder: Builder[S],
    seeds: Sequence[int],
    parallel: bool = False,
    progress: bool | None = None,
    extract_fn: Callable[[S], T],
    n_jobs: int | None = None,
    log_dir: Path | None = None,
    log_format: Literal["text", "json"] = "text",
) -> None

Initialize the runner.

Parameters:

Name Type Description Default
builder Builder[S]

Callable that accepts env: Environment and returns a system

required
seeds Sequence[int]

Sequence of random seeds for each simulation run

required
parallel bool

Whether to run simulations in parallel using multiprocessing

False
progress bool | None

Whether to show a progress bar (None = auto, based on stderr TTY)

None
extract_fn Callable[[S], T]

Function to extract results from the system after simulation

required
n_jobs int | None

Number of parallel workers (defaults to CPU count)

None
log_dir Path | None

Optional directory for per-simulation log files. Each simulation will create a file named sim_XXXX_seed_YYYY.log

None
log_format Literal['text', 'json']

Log output format ("text" or "json")

'text'

_run_single

_run_single(args: tuple[int, int, float]) -> tuple[int, T]

Run a single simulation.

Parameters:

Name Type Description Default
args tuple[int, int, float]

Tuple of (run_id, seed, until)

required

Returns:

Type Description
tuple[int, T]

Tuple of (run_id, extracted result)

run

run(until: float) -> list[T]

Run all simulations.

Parameters:

Name Type Description Default
until float

Simulation end time

required

Returns:

Type Description
list[T]

List of extracted results, one per seed

Extension points

OperationHook

Bases: Protocol

Hook called before or after each operation.

Hooks may be plain synchronous functions (returning None) or generator-based (yielding SimPy events). Both styles can coexist in the same hook list and execute in registration order.

Examples:

A synchronous logging hook::

def log_hook(job, server, op_index, processing_time):
    print(f"t={server.env.now}: {job.sku} op {op_index} on {server}")

A generator hook that adds setup time::

def setup_time_hook(job, server, op_index, processing_time):
    setup = 2.0 if job.sku.startswith("COMPLEX") else 0.5
    yield server.env.timeout(setup)

shopfloor = ShopFloor(env=env, on_before_operation=setup_time_hook)

WIPStrategy

Bases: Protocol

Strategy for calculating work-in-progress (WIP).

WIP strategies define how processing times are accumulated when jobs enter the shop floor and how they are decremented as operations complete.

Two built-in strategies are provided: - StandardWIPStrategy: Full processing time per server - CorrectedWIPStrategy: Position-discounted WIP (1/1, 1/2, 1/3, ...)

StandardWIPStrategy

Default WIP strategy: full processing time added per server.

When a job enters the shop floor, the full processing time for each operation is added to the corresponding server's WIP. When an operation completes, only that operation's processing time is decremented.

CorrectedWIPStrategy

Position-discounted WIP strategy.

Processing times are discounted by operation position: - 1st operation: full time (1/1) - 2nd operation: half time (1/2) - 3rd operation: third time (1/3) - etc.

As operations complete, remaining operations' WIP values are adjusted upward to reflect their new position in the routing.

This strategy provides a more balanced view of workload when jobs have long routings, preventing downstream servers from appearing overloaded due to jobs that haven't reached them yet.

MetricsCollector

Bases: Protocol

Collector for job completion metrics.

Metrics collectors receive completed jobs and can compute any desired performance metrics. The built-in EMAMetricsCollector computes exponential moving averages for common metrics.

Example

A simple throughput collector::

class ThroughputCollector:
    def __init__(self):
        self.count = 0
        self.tardy = 0

    def record(self, job):
        self.count += 1
        if job.lateness > 0:
            self.tardy += 1

collector = ThroughputCollector()
shopfloor = ShopFloor(env=env, metrics_collector=collector)

Dispatcher

Bases: Protocol

Reference protocol showing the full dispatcher interface.

All methods are optional at runtime — attach_dispatcher wires only those that are present and callable on the dispatcher object.

This protocol is NOT runtime-checkable. It exists for documentation and IDE support. Partial implementations are explicitly supported.