Core API
The core of Simulatte is a small set of cooperating objects: Environment drives the SimPy
clock and carries the logger; ShopFloor is the central orchestrator that tracks WIP, routes
jobs, and fires hooks; ProductionJob represents a unit of work that moves through a sequence
of Server resources via the Router; PreShopPool holds jobs before they are released to
the floor; and Runner repeats multiple simulation replications with independent random seeds.
See the architecture diagram for how these objects interact.
Core objects
Environment
Bases:
Thin wrapper around simpy.Environment with integrated logging.
Each environment has its own logger that: - Automatically includes simulation time in log output - Supports JSON or text output format - Maintains an in-memory history buffer - Supports per-component filtering
_logger
instance-attribute
_logger = SimLogger(
env=self,
log_file=log_file,
log_format=log_format,
history_size=log_history_size,
db_path=log_db_path,
)
log_history
property
log_history: EventHistoryBuffer
Access the event history buffer.
Returns:
| Type | Description |
|---|---|
|
The EventHistoryBuffer containing recent log events. |
|
Use .query() to filter events by level, component, or time range. |
Example
env.log_history.query(level="ERROR", since=100.0)
logger
property
logger: SimLogger
Access the underlying SimLogger for advanced configuration.
Use this to enable/disable component-level filtering: >>> env.logger.disable_component("Server") >>> env.logger.enable_component("ShopFloor")
__enter__
__enter__() -> Environment
__exit__
__exit__(exc_type: object, exc: object, tb: object) -> None
__init__
__init__(
*,
log_file: str | Path | None = None,
log_format: Literal["text", "json"] = "text",
log_history_size: int = 1000,
log_db_path: str | Path | None = None,
) -> None
Initialize the simulation environment.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
log_file
|
|
Optional file path for log output (defaults to stderr) |
None
|
log_format
|
|
Output format ("text" or "json") |
'text'
|
log_history_size
|
|
Maximum number of events to keep in history buffer |
1000
|
log_db_path
|
|
Optional SQLite database path for persistent event storage. If provided, events are stored in both memory buffer and SQLite. |
None
|
close
close() -> None
Release logger resources associated with this environment.
debug
debug(
message: str,
*,
component: str | None = None,
**extra: Any,
) -> None
Log a debug message with simulation time context.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
|
The log message |
required |
component
|
|
Optional component class name for filtering (e.g., "Server") |
None
|
**extra
|
|
Additional structured data to include in the log |
{}
|
error
error(
message: str,
*,
component: str | None = None,
**extra: Any,
) -> None
Log an error message with simulation time context.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
|
The log message |
required |
component
|
|
Optional component class name for filtering (e.g., "Server") |
None
|
**extra
|
|
Additional structured data to include in the log |
{}
|
info
info(
message: str,
*,
component: str | None = None,
**extra: Any,
) -> None
Log an info message with simulation time context.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
|
The log message |
required |
component
|
|
Optional component class name for filtering (e.g., "Server") |
None
|
**extra
|
|
Additional structured data to include in the log |
{}
|
step
step() -> None
Process the next event in the queue.
If user interrupts the simulation via KeyboardInterrupt raise a StopSimulation exception to gently pause the simulation.
warning
warning(
message: str,
*,
component: str | None = None,
**extra: Any,
) -> None
Log a warning message with simulation time context.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
|
The log message |
required |
component
|
|
Optional component class name for filtering (e.g., "Server") |
None
|
**extra
|
|
Additional structured data to include in the log |
{}
|
ShopFloor
Central orchestrator for job flow through a manufacturing simulation.
The ShopFloor manages the complete lifecycle of production jobs as they move through a sequence of servers. It tracks work-in-progress (WIP) at each server, maintains metrics for performance monitoring, and signals events when jobs complete processing steps or finish entirely.
Extensibility is provided through composition: - on_before_operation / on_after_operation: Hooks for custom logic at each operation - wip_strategy: Pluggable WIP calculation - metrics_collector: Pluggable metrics recording - time_series_collector: Pluggable time-series data collection - on_job_finished: Callbacks when jobs complete - material_coordinator: Optional material delivery coordination
Attributes:
| Name | Type | Description |
|---|---|---|
|
The simulation environment providing time and process management. |
|
|
Optional coordinator for material delivery. |
|
|
|
List of servers registered with this shop floor. |
|
|
Set of jobs currently being processed on the shop floor. |
|
|
List of completed jobs in order of completion. |
|
|
Dictionary mapping each server to its current WIP value. |
|
|
Cumulative time spent by all completed jobs. |
|
SimPy event triggered when any job finishes processing at a server. Recreated after each trigger. |
|
|
SimPy event triggered when any job completes its entire routing. Recreated after each trigger. |
|
|
|
Peak total WIP observed during simulation. |
|
|
Peak number of concurrent jobs observed. |
Example
Basic usage with hooks::
from simulatte import Environment, Server, ProductionJob, ShopFloor
def setup_hook(job, server, op_index, pt):
yield server.env.timeout(1.0) # 1s setup time
env = Environment()
shop_floor = ShopFloor(env=env, on_before_operation=setup_hook)
server = Server(env=env, capacity=1, shopfloor=shop_floor)
job = ProductionJob(
env=env, sku="PART-A", servers=[server],
processing_times=[10.0], due_date=100.0,
)
shop_floor.add(job)
env.run()
_after_operation
instance-attribute
_after_operation: list[OperationHook] = _normalize_hooks(
on_after_operation
)
_before_operation
instance-attribute
_before_operation: list[OperationHook] = _normalize_hooks(
on_before_operation
)
_metrics_collector
instance-attribute
_metrics_collector: MetricsCollector | None = (
EMAMetricsCollector(alpha=ema_alpha)
)
_on_job_finished
instance-attribute
_on_job_finished: list[Callable[[ProductionJob], None]] = (
_normalize_callbacks(on_job_finished)
)
_processing_end_callbacks
instance-attribute
_processing_end_callbacks: list[
Callable[[ProductionJob, Server], None]
] = []
_time_series_collector
instance-attribute
_time_series_collector: TimeSeriesCollector | None = (
time_series_collector
)
_wip_strategy
instance-attribute
_wip_strategy: WIPStrategy = (
wip_strategy
if wip_strategy is not None
else StandardWIPStrategy()
)
average_time_in_system
property
average_time_in_system: float
Average time jobs spend in the system from first server entry to completion.
Calculated as total_time_in_system divided by the number of completed jobs. Returns 0.0 if no jobs have completed yet.
Returns:
| Type | Description |
|---|---|
|
Average time in system for all completed jobs, or 0.0 if none completed. |
env
instance-attribute
env = env
job_finished_event
instance-attribute
job_finished_event = event()
job_processing_end
instance-attribute
job_processing_end = event()
jobs
instance-attribute
jobs: set[ProductionJob] = set()
jobs_done
instance-attribute
jobs_done: list[ProductionJob] = []
material_coordinator
instance-attribute
material_coordinator = material_coordinator
maximum_shopfloor_jobs
instance-attribute
maximum_shopfloor_jobs: int = 0
maximum_wip_value
instance-attribute
maximum_wip_value: float = 0.0
metrics_collector
property
metrics_collector: MetricsCollector | None
Collector called when jobs complete (or None if disabled).
servers
instance-attribute
servers: list[Server] = []
time_series_collector
property
time_series_collector: TimeSeriesCollector | None
Collector for time-series data (or None if disabled).
total_time_in_system
instance-attribute
total_time_in_system: float = 0.0
wip
instance-attribute
wip: dict[Server, float] = {}
wip_strategy
property
wip_strategy: WIPStrategy
The current WIP strategy used by the shopfloor.
__init__
__init__(
*,
env: Environment,
ema_alpha: float = 0.01,
material_coordinator: MaterialCoordinator | None = None,
wip_strategy: WIPStrategy | None = None,
metrics_collector: MetricsCollector
| None
| object = _DEFAULT_METRICS_COLLECTOR,
collect_time_series: bool = False,
time_series_collector: TimeSeriesCollector
| None = None,
on_before_operation: OperationHook
| Sequence[OperationHook]
| None = None,
on_after_operation: OperationHook
| Sequence[OperationHook]
| None = None,
on_job_finished: Callable[[ProductionJob], None]
| Sequence[Callable[[ProductionJob], None]]
| None = None,
) -> None
Initialize a new ShopFloor instance.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
env
|
|
The simulation environment that provides time management, event scheduling, and process coordination. |
required |
ema_alpha
|
|
Smoothing factor for the default EMAMetricsCollector. Must be in range (0, 1]. Ignored if a custom metrics_collector is provided. Defaults to 0.01. |
0.01
|
material_coordinator
|
|
Optional coordinator for handling material delivery to servers. When provided, the shop floor will ensure materials are delivered before processing begins at each operation, implementing FIFO blocking behavior. |
None
|
wip_strategy
|
|
Strategy for WIP calculation. Defaults to StandardWIPStrategy which uses full processing times. |
None
|
metrics_collector
|
|
Collector for job completion metrics. Defaults to EMAMetricsCollector. Pass None to disable metrics. |
|
collect_time_series
|
|
If True and time_series_collector is None, creates a DefaultTimeSeriesCollector for WIP, job count, throughput, and lateness tracking. Defaults to False. |
False
|
time_series_collector
|
|
Collector for time-series data. If provided, overrides collect_time_series. Pass None to disable time-series collection. Defaults to None. |
None
|
on_before_operation
|
|
Hook(s) called after acquiring server but before material delivery and processing. Can be a single hook or list. |
None
|
on_after_operation
|
|
Hook(s) called after processing completes but before signaling. Can be a single hook or list. |
None
|
on_job_finished
|
|
Callback(s) called when a job completes its entire routing. Can be a single callable or list. |
None
|
_fire_processing_end_callbacks
_fire_processing_end_callbacks(
job: ProductionJob, server: Server
) -> None
Invoke on_processing_end callbacks after server release.
Called after the with server.request() context exits, meaning
servers_exit_at is stamped and the server resource is freed. This
is the correct point for release-policy decisions that inspect
server state or job.previous_server.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job
|
|
The job that just completed processing. |
required |
server
|
|
The server where processing completed. |
required |
_normalize_callbacks
staticmethod
_normalize_callbacks(
callbacks: Callable[[ProductionJob], None]
| Sequence[Callable[[ProductionJob], None]]
| None,
) -> list[Callable[[ProductionJob], None]]
Normalize callback parameter to a list.
_normalize_hooks
staticmethod
_normalize_hooks(
hooks: OperationHook | Sequence[OperationHook] | None,
) -> list[OperationHook]
Normalize hook parameter to a list.
add
add(job: ProductionJob) -> None
Release a job from the Pre-Shop Pool onto the shop floor.
This method performs the following actions: 1. Adds the job to the active jobs set 2. Updates WIP values via the configured WIP strategy 3. Records the PSP exit timestamp on the job 4. Spawns the main processing coroutine for the job
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job
|
|
The production job to release onto the shop floor. The job's routing must contain valid server references. |
required |
Note
This method modifies the job's psp_exit_at timestamp and spawns an async process. The job will begin queuing at its first server immediately after this call.
attach_dispatcher
attach_dispatcher(
dispatcher: object, *, psp: PreShopPool | None = None
) -> None
Wire a dispatcher object's hook methods to this shopfloor.
Detects which hook methods exist on the dispatcher and registers only those that are callable. This allows partial implementations where a dispatcher only handles a subset of events.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dispatcher
|
|
Object with any combination of on_before_operation, on_after_operation, on_job_finished, on_processing_end, and on_psp_arrival methods. |
required |
psp
|
|
If provided and dispatcher has on_psp_arrival, registers an arrival subscription on the PSP. |
None
|
main
main(job: ProductionJob) -> ProcessGenerator
Execute the main processing loop for a job through all its servers.
This generator manages the complete lifecycle of a job as it moves through its routing. For each server in the job's routing, it:
- Requests and acquires the server resource (queuing if busy)
- Executes on_before_operation hooks
- If a MaterialCoordinator is configured, waits for material delivery
- Processes the job for the specified duration
- Updates WIP via the configured WIP strategy
- Executes on_after_operation hooks
- Signals processing completion (job_processing_end event + on_processing_end callbacks)
After all operations complete, it: - Records the finish timestamp on the job - Moves the job from active (jobs) to completed (jobs_done) - Records metrics via the configured metrics collector - Calls on_job_finished callbacks - Signals job completion via signal_job_finished()
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job
|
|
The production job to process through its routing. |
required |
Yields:
| Type | Description |
|---|---|
|
SimPy events for server requests, hooks, material delivery, and processing. |
Note
This method is automatically spawned by add() and should not be called directly. It runs as a SimPy process until the job completes.
on_after_operation
on_after_operation(hook: OperationHook) -> None
Register a hook to run after each operation.
Hooks registered post-construction execute after any hooks passed via init, in registration order.
on_before_operation
on_before_operation(hook: OperationHook) -> None
Register a hook to run before each operation.
Hooks registered post-construction execute after any hooks passed via init, in registration order.
on_job_finished
on_job_finished(
callback: Callable[[ProductionJob], None],
) -> None
Register a callback for when a job completes its entire routing.
Callbacks registered post-construction execute after any callbacks passed via init, in registration order.
on_processing_end
on_processing_end(
callback: Callable[[ProductionJob, Server], None],
) -> None
Register a callback for when a job completes processing at any server.
Callbacks are invoked synchronously after the server is released
(servers_exit_at is stamped and job.previous_server is
available). This fires after each operation, not just when the
job finishes its entire routing.
Note
The SimPy job_processing_end event is succeeded earlier
(while the server is still held). These callbacks fire after
server release and always run before SimPy process-based
listeners resume.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
callback
|
|
Function called with (job, server) after processing completes. |
required |
set_metrics_collector
set_metrics_collector(
collector: MetricsCollector | None,
) -> None
Replace the shopfloor's metrics collector (or disable with None).
set_time_series_collector
set_time_series_collector(
collector: TimeSeriesCollector | None,
) -> None
Replace the shopfloor's time-series collector (or disable with None).
set_wip_strategy
set_wip_strategy(strategy: WIPStrategy) -> None
Replace the shopfloor's WIP strategy.
signal_job_finished
signal_job_finished(job: ProductionJob) -> None
Signal that a job has completed its entire routing.
This method triggers the job_finished_event with the completed job as the event value, notifying any waiting processes that a job has finished all operations. The event is then recreated for the next signal.
Unlike job_processing_end, this is only called once per job when it completes its final operation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job
|
|
The job that just completed its entire routing. |
required |
Example
Counting completed jobs::
completed = 0
while completed < target:
job = yield shop_floor.job_finished_event
completed += 1
print(f"Job {job.id} completed. Total: {completed}")
Server
Bases:
A server/workstation for job-shop simulation with queue and utilization tracking.
Server extends SimPy's PriorityResource to process jobs with priority-based queueing. It tracks queue lengths, utilization rates, and optionally records time-series data for visualization. When attached to a ShopFloor, the server is automatically registered and assigned an index for identification.
Dynamic priorities: queued jobs' priorities are refreshed before every
dispatch decision. sort_queue re-evaluates each queued request's
job.priority_policy and rewrites req.key; _trigger_put
(the SimPy hook invoked on both new-arrival and release paths) calls
sort_queue before delegating to SimPy. Callers may also invoke
sort_queue explicitly to observe the resulting order between
events. The cost per dispatch decision is one priority_policy call
per queued request; policies must be deterministic given (job, server) and the
current simulation state at call time.
_idx
instance-attribute
_idx = index(self)
_jobs
instance-attribute
_jobs: list[BaseJob] | None = (
[] if retain_job_history else None
)
_last_queue_level
instance-attribute
_last_queue_level: int = 0
_last_queue_level_timestamp
instance-attribute
_last_queue_level_timestamp: float = 0
_qt
instance-attribute
_qt: list[tuple[float, int]] | None = (
[] if collect_time_series else None
)
_queue_history
instance-attribute
_queue_history: dict[int, float] = defaultdict(float)
_ut
instance-attribute
_ut: list[tuple[float, float]] | None = (
[(0, 0.0)] if collect_time_series else None
)
average_queue_length
property
average_queue_length: float
Time-weighted average queue length over the simulation.
current_jobs
property
current_jobs: tuple[BaseJob, ...]
Jobs currently occupying active server slots (includes hook/material phases).
empty
property
empty: bool
Whether the queue is empty.
env
instance-attribute
env = env
idle_time
property
idle_time: float
Total time the server has been idle.
is_idle
property
is_idle: bool
Whether the server has no active users and an empty queue.
queueing_jobs
property
queueing_jobs: Iterable[BaseJob]
Iterator over jobs currently waiting in the queue.
utilization_rate
property
utilization_rate: float
Fraction of time the server has been busy (0 to 1).
worked_time
instance-attribute
worked_time: float = 0
__init__
__init__(
*,
env: Environment,
capacity: int,
shopfloor: ShopFloor | None = None,
collect_time_series: bool = False,
retain_job_history: bool = False,
) -> None
Initialize a server resource.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
env
|
|
The simulation environment. |
required |
capacity
|
|
Maximum number of jobs that can be processed simultaneously. |
required |
shopfloor
|
|
Optional ShopFloor for automatic registration. If provided, the server is added to the shopfloor's server list. |
None
|
collect_time_series
|
|
If True, record queue length and utilization over time for later visualization via plot_qt() and plot_ut(). |
False
|
retain_job_history
|
|
If True, maintain a list of all processed jobs. |
False
|
__repr__
__repr__() -> str
_trigger_put
_trigger_put(get_event: Release | None) -> None
Refresh queue priorities before SimPy iterates the put queue.
Overrides simpy.resources.base.BaseResource._trigger_put to
call sort_queue (which re-evaluates job.priority_policy
for every queued request and rewrites req.key) before delegating
to SimPy. SimPy invokes _trigger_put from two call sites:
simpy.resources.base.Put.__init__ (after a new arrival is
appended to put_queue) and as a callback on every Release event
(simpy.resources.base.Get.__init__ registers it). Refreshing
here therefore covers both the new-arrival and release dispatch paths.
_update_qt
_update_qt() -> None
Record current queue length to the time-series if collection is enabled.
_update_queue_history
_update_queue_history(_: Event | None) -> None
Update queue histogram and trigger time-series updates.
_update_ut
_update_ut() -> None
Record current utilization to the time-series if collection is enabled.
plot_qt
plot_qt() -> None
Display a step plot of queue length over simulation time.
Raises:
| Type | Description |
|---|---|
|
If time-series collection was not enabled at initialization. |
plot_ut
plot_ut() -> None
Display a step plot of utilization rate over simulation time.
Raises:
| Type | Description |
|---|---|
|
If time-series collection was not enabled at initialization. |
process_job
process_job(
job: BaseJob, processing_time: float
) -> ProcessGenerator
Simulate processing a job for a given duration.
This generator yields a timeout event for the processing duration and updates worked_time. Should be called within a request context.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job
|
|
The job being processed. |
required |
processing_time
|
|
Duration of processing in simulation time units. |
required |
Yields:
| Type | Description |
|---|---|
|
A SimPy timeout event for the processing duration. |
release
release(request: ServerPriorityRequest) -> Release
Release the server after job processing.
Records the job's exit time and updates utilization tracking.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
request
|
|
The ServerPriorityRequest to release. |
required |
Returns:
| Type | Description |
|---|---|
|
A SimPy Release event. |
request
request(
*, job: BaseJob, preempt: bool = True
) -> ServerPriorityRequest
Request server access for a job with priority-based queueing.
Creates a priority request that enters the server queue. The request should be used as a context manager to ensure proper release.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job
|
|
The job requesting server access. |
required |
preempt
|
|
If True, this request can preempt lower-priority jobs. |
True
|
Returns:
| Type | Description |
|---|---|
|
A ServerPriorityRequest to be yielded and used as a context manager. |
sort_queue
sort_queue() -> None
Refresh queued requests' priority keys and resort.
For each request in the queue, calls req.job.priority(req.server)
to obtain the current priority and rewrites req.key accordingly,
then sorts the queue in ascending order by the refreshed keys.
Called automatically before every dispatch decision via
_trigger_put. May also be invoked explicitly by user code
that has mutated priority_policy and wants to observe the new
order before the next dispatch event.
Note: req.priority is not refreshed; it remains the snapshot
taken at request construction. To inspect a queued job's current
priority, call req.job.priority(req.server) directly.
Requires that every queued request expose job, server,
time, and preempt attributes (which ServerPriorityRequest
does).
ProductionJob
Bases:
A production job that flows through servers with optional material requirements.
Production jobs represent manufacturing orders that require processing at one or more servers. They can optionally specify material requirements that must be delivered before processing can begin at each operation.
__slots__
class-attribute
instance-attribute
__slots__ = ('material_requirements',)
material_requirements
instance-attribute
material_requirements = material_requirements or {}
__init__
__init__(
*,
env: Environment,
sku: str,
servers: Sequence[Server],
processing_times: Sequence[float],
due_date: SimTime,
priority_policy: Callable[[Any, Server], float]
| None = None,
material_requirements: dict[int, dict[str, int]]
| None = None,
) -> None
Initialize a production job.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
env
|
|
The simulation environment. |
required |
sku
|
|
Job SKU identifier. |
required |
servers
|
|
Sequence of servers in the job's routing. |
required |
processing_times
|
|
Processing time at each server. |
required |
due_date
|
|
Target completion time. |
required |
priority_policy
|
|
Optional function to compute priority at each server. |
None
|
material_requirements
|
|
Optional mapping from operation index to required materials. Format: {op_index: {product_name: quantity}}. Example: {0: {"steel": 2, "bolts": 10}} means operation 0 requires 2 units of steel and 10 bolts to be delivered before processing. |
None
|
__repr__
__repr__() -> str
get_materials_for_operation
get_materials_for_operation(
op_index: int,
) -> dict[str, int]
Get material requirements for a specific operation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
op_index
|
|
The operation index (0-based). |
required |
Returns:
| Type | Description |
|---|---|
|
Dictionary mapping product names to required quantities, |
|
or empty dict if no materials required. |
Router
Stochastic job generator that routes jobs through the simulation.
The Router continuously generates ProductionJob instances at random intervals determined by the inter-arrival distribution. Each job is assigned a randomly selected SKU, a routing through servers, and processing times sampled from configured distributions.
Jobs are routed based on system configuration: - Push system (psp=None): Jobs go directly to the ShopFloor - Pull system (psp set): Jobs queue in the PreShopPool until released
Upon instantiation, the Router registers itself as a SimPy process that runs for the duration of the simulation.
due_date_offset_distribution
instance-attribute
due_date_offset_distribution = due_date_offset_distribution
due_date_rule
instance-attribute
due_date_rule = due_date_rule
env
instance-attribute
env = env
inter_arrival_distribution
instance-attribute
inter_arrival_distribution = inter_arrival_distribution
priority_policies
instance-attribute
priority_policies = priority_policies
psp
instance-attribute
psp = psp
servers
instance-attribute
servers = servers
shopfloor
instance-attribute
shopfloor = shopfloor
sku_distributions
instance-attribute
sku_distributions = sku_distributions
sku_routings
instance-attribute
sku_routings = sku_routings
sku_service_times
instance-attribute
sku_service_times = sku_service_times
__init__
__init__(
*,
env: Environment,
shopfloor: ShopFloor,
servers: Sequence[Server],
psp: PreShopPool | None,
inter_arrival_distribution: Sampler[float],
sku_distributions: DiscreteDistribution[str, float],
sku_routings: dict[str, Callable[[], Sequence[Server]]],
sku_service_times: dict[
str, DiscreteDistribution[Server, Sampler[float]]
],
due_date_offset_distribution: dict[str, Sampler[float]],
priority_policies: Callable[
[ProductionJob, Server], float
]
| None = None,
due_date_rule: dict[
str, Callable[[Sequence[float]], float]
]
| None = None,
) -> None
Initialize the Router and start the job generation process.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
env
|
|
The simulation environment. |
required |
shopfloor
|
|
ShopFloor instance managing job flow and WIP tracking. |
required |
servers
|
|
Sequence of all available Server instances in the system. |
required |
psp
|
|
PreShopPool for pull systems, or None for push systems where jobs go directly to the ShopFloor. |
required |
inter_arrival_distribution
|
|
Callable returning the time until the next
job arrival (e.g., |
required |
sku_distributions
|
|
Mapping from SKU names to probability weights for
random SKU selection (e.g., |
required |
sku_routings
|
|
Mapping from SKU to a callable that returns the server routing sequence for that SKU. |
required |
sku_service_times
|
|
Nested mapping |
required |
due_date_offset_distribution
|
|
Mapping from SKU to a callable returning the
offset used to compute due date ( |
required |
priority_policies
|
|
Optional callable |
None
|
due_date_rule
|
|
Optional per-SKU mapping to a callable
|
None
|
Example
router = Router( ... env=env, ... shopfloor=shop_floor, ... servers=servers, ... psp=None, # Push system ... inter_arrival_distribution=lambda: random.expovariate(1.0), ... sku_distributions={"F1": 1.0}, ... sku_routings={"F1": lambda: servers}, ... sku_service_times={"F1": {s: lambda: 2.0 for s in servers}}, ... due_date_offset_distribution={"F1": lambda: 30.0}, ... )
generate_job
generate_job() -> Generator[Timeout, None, NoReturn]
Infinite generator that creates and routes jobs at random intervals.
This method runs as a SimPy process for the simulation's duration. On each iteration it:
- Waits for the inter-arrival time
- Samples a random SKU based on configured weights
- Generates a routing and processing times for the selected SKU
- Creates a ProductionJob with computed due date
- Routes the job to PSP (if configured) or directly to ShopFloor
Yields:
| Type | Description |
|---|---|
|
simpy.Timeout: Pauses the process until the next job arrival. |
PreShopPool
Buffer queue for jobs awaiting shopfloor release.
A pure container with no built-in release logic. Release policies are
implemented as external SimPy processes using the trigger functions from
simulatte.policies.triggers.
The pool provides a new_job event that external processes can monitor
to react immediately when jobs arrive (e.g., for starvation avoidance).
Example
from simulatte.policies.triggers import periodic_trigger, on_arrival_trigger psp = PreShopPool(env=env, shopfloor=shopfloor) env.process(periodic_trigger(psp, 1.0, my_release_fn)) env.process(on_arrival_trigger(psp, my_on_arrival_fn))
_arrival_callbacks
instance-attribute
_arrival_callbacks: list[
Callable[[ProductionJob, PreShopPool], None]
] = []
_psp
instance-attribute
_psp: deque[ProductionJob] = deque()
empty
property
empty: bool
Whether the pool contains no jobs.
env
instance-attribute
env = env
jobs
property
jobs: Iterable[ProductionJob]
Iterate over jobs in the pool in FIFO order (oldest first).
new_job
instance-attribute
new_job = event()
shopfloor
instance-attribute
shopfloor = shopfloor
__contains__
__contains__(job: ProductionJob) -> bool
Check if a job is currently in the pool.
__getitem__
__getitem__(index: int) -> ProductionJob
Get a job by its position in the queue (0 = oldest).
__init__
__init__(*, env: Environment, shopfloor: ShopFloor) -> None
Initialize the pre-shop pool.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
env
|
|
The simulation environment. |
required |
shopfloor
|
|
The shopfloor that will receive released jobs. |
required |
__len__
__len__() -> int
Return the number of jobs currently in the pool.
_signal_new_job
_signal_new_job(job: ProductionJob) -> None
Invoke arrival callbacks and trigger the new_job event.
First invokes all registered on_arrival callbacks synchronously, then succeeds the SimPy new_job event (waking process-based listeners).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job
|
|
The job to pass to callbacks and as the event's value. |
required |
add
add(job: ProductionJob) -> None
Add a job to the pool and signal its arrival.
Appends the job to the end of the queue and triggers the new_job event,
allowing event-driven processes (e.g., starvation avoidance) to react
immediately to the new arrival.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job
|
|
The production job to add to the pool. |
required |
jobs_starting_at
jobs_starting_at(server: Server) -> list[ProductionJob]
Return jobs in the pool whose routing begins at the given server.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
server
|
|
The server to filter by. |
required |
Returns:
| Type | Description |
|---|---|
|
List of jobs whose first routing server matches, in FIFO order. |
on_arrival
on_arrival(
callback: Callable[[ProductionJob, PreShopPool], None],
) -> None
Subscribe a callback to be invoked each time a job arrives in the pool.
Callbacks are invoked synchronously during add(), before the SimPy new_job event fires. No env.run() priming is needed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
callback
|
|
Function called with (job, psp) when a job arrives. |
required |
release
release(job: ProductionJob) -> None
Remove a job from the pool and release it to the shopfloor.
Convenience method combining remove() and shopfloor.add(). Use remove() instead if you want to discard a job without releasing it.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job
|
|
The job to release from the pool to the shopfloor. |
required |
Raises:
| Type | Description |
|---|---|
|
If the job is not found in the pool. |
remove
remove(
*, job: ProductionJob | None = None
) -> ProductionJob
Remove a job from the pool and record its exit timestamp.
Supports two modes: FIFO removal (default) or specific job removal.
Sets job.psp_exit_at to the current simulation time before returning.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job
|
|
The specific job to remove. If None, removes the oldest job (FIFO). |
None
|
Returns:
| Type | Description |
|---|---|
|
The removed job with its |
Raises:
| Type | Description |
|---|---|
|
If a specific job is requested but not found in the pool. |
Runner
Bases:
Manage repeated simulations with configurable builder and seeds.
The builder callable should accept an env: Environment parameter.
Supports per-simulation log files when running in parallel, enabling separate log output for each simulation instance.
builder
instance-attribute
builder = builder
extract_fn
instance-attribute
extract_fn = extract_fn
log_dir
instance-attribute
log_dir = log_dir
log_format
instance-attribute
log_format: Literal['text', 'json'] = log_format
n_jobs
instance-attribute
n_jobs = n_jobs
parallel
instance-attribute
parallel = parallel
progress
instance-attribute
progress = progress
seeds
instance-attribute
seeds = seeds
__init__
__init__(
*,
builder: Builder[S],
seeds: Sequence[int],
parallel: bool = False,
progress: bool | None = None,
extract_fn: Callable[[S], T],
n_jobs: int | None = None,
log_dir: Path | None = None,
log_format: Literal["text", "json"] = "text",
) -> None
Initialize the runner.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
builder
|
|
Callable that accepts env: Environment and returns a system |
required |
seeds
|
|
Sequence of random seeds for each simulation run |
required |
parallel
|
|
Whether to run simulations in parallel using multiprocessing |
False
|
progress
|
|
Whether to show a progress bar (None = auto, based on stderr TTY) |
None
|
extract_fn
|
|
Function to extract results from the system after simulation |
required |
n_jobs
|
|
Number of parallel workers (defaults to CPU count) |
None
|
log_dir
|
|
Optional directory for per-simulation log files. Each simulation will create a file named sim_XXXX_seed_YYYY.log |
None
|
log_format
|
|
Log output format ("text" or "json") |
'text'
|
_run_single
_run_single(args: tuple[int, int, float]) -> tuple[int, T]
Run a single simulation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
args
|
|
Tuple of (run_id, seed, until) |
required |
Returns:
| Type | Description |
|---|---|
|
Tuple of (run_id, extracted result) |
run
run(until: float) -> list[T]
Run all simulations.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
until
|
|
Simulation end time |
required |
Returns:
| Type | Description |
|---|---|
|
List of extracted results, one per seed |
Extension points
OperationHook
Bases:
Hook called before or after each operation.
Hooks may be plain synchronous functions (returning None) or generator-based (yielding SimPy events). Both styles can coexist in the same hook list and execute in registration order.
Examples:
A synchronous logging hook::
def log_hook(job, server, op_index, processing_time):
print(f"t={server.env.now}: {job.sku} op {op_index} on {server}")
A generator hook that adds setup time::
def setup_time_hook(job, server, op_index, processing_time):
setup = 2.0 if job.sku.startswith("COMPLEX") else 0.5
yield server.env.timeout(setup)
shopfloor = ShopFloor(env=env, on_before_operation=setup_time_hook)
WIPStrategy
Bases:
Strategy for calculating work-in-progress (WIP).
WIP strategies define how processing times are accumulated when jobs enter the shop floor and how they are decremented as operations complete.
Two built-in strategies are provided: - StandardWIPStrategy: Full processing time per server - CorrectedWIPStrategy: Position-discounted WIP (1/1, 1/2, 1/3, ...)
StandardWIPStrategy
Default WIP strategy: full processing time added per server.
When a job enters the shop floor, the full processing time for each operation is added to the corresponding server's WIP. When an operation completes, only that operation's processing time is decremented.
CorrectedWIPStrategy
Position-discounted WIP strategy.
Processing times are discounted by operation position: - 1st operation: full time (1/1) - 2nd operation: half time (1/2) - 3rd operation: third time (1/3) - etc.
As operations complete, remaining operations' WIP values are adjusted upward to reflect their new position in the routing.
This strategy provides a more balanced view of workload when jobs have long routings, preventing downstream servers from appearing overloaded due to jobs that haven't reached them yet.
MetricsCollector
Bases:
Collector for job completion metrics.
Metrics collectors receive completed jobs and can compute any desired performance metrics. The built-in EMAMetricsCollector computes exponential moving averages for common metrics.
Example
A simple throughput collector::
class ThroughputCollector:
def __init__(self):
self.count = 0
self.tardy = 0
def record(self, job):
self.count += 1
if job.lateness > 0:
self.tardy += 1
collector = ThroughputCollector()
shopfloor = ShopFloor(env=env, metrics_collector=collector)
Dispatcher
Bases:
Reference protocol showing the full dispatcher interface.
All methods are optional at runtime — attach_dispatcher wires
only those that are present and callable on the dispatcher object.
This protocol is NOT runtime-checkable. It exists for documentation and IDE support. Partial implementations are explicitly supported.