"""
State-Task Network for SPROCLIB
This module implements State-Task Network (STN) scheduling for batch processes
with material balances, equipment constraints, and optimization.
Author: Thorsten Gressling <gressling@paramus.ai>
License: MIT License
"""
import numpy as np
from typing import Optional, Tuple, Dict, Any, List
import matplotlib.pyplot as plt
import logging
logger = logging.getLogger(__name__)
[docs]
class StateTaskNetwork:
"""State-Task Network for batch process scheduling."""
[docs]
def __init__(self, name: str = "STN"):
"""
Initialize State-Task Network.
Args:
name: Network name
"""
self.name = name
self.states = {} # Materials/intermediates
self.tasks = {} # Process tasks
self.units = {} # Equipment units
self.schedule = {}
logger.info(f"State-Task Network '{name}' initialized")
[docs]
def add_state(
self,
name: str,
capacity: float = float('inf'),
initial_amount: float = 0.0,
price: float = 0.0,
is_product: bool = False
):
"""
Add a state (material) to the network.
Args:
name: State name
capacity: Storage capacity
initial_amount: Initial inventory
price: Unit price/cost
is_product: Whether this is a final product
"""
self.states[name] = {
'capacity': capacity,
'initial_amount': initial_amount,
'price': price,
'is_product': is_product
}
logger.info(f"Added state '{name}' to STN")
[docs]
def add_task(
self,
name: str,
duration: float,
inputs: Dict[str, float],
outputs: Dict[str, float],
suitable_units: List[str],
variable_cost: float = 0.0
):
"""
Add a task to the network.
Args:
name: Task name
duration: Task duration
inputs: Input materials {state: amount}
outputs: Output materials {state: amount}
suitable_units: List of units that can perform this task
variable_cost: Variable cost per batch
"""
self.tasks[name] = {
'duration': duration,
'inputs': inputs,
'outputs': outputs,
'suitable_units': suitable_units,
'variable_cost': variable_cost
}
logger.info(f"Added task '{name}' to STN")
[docs]
def add_unit(
self,
name: str,
capacity: float = 1.0,
unit_cost: float = 0.0,
availability: float = 1.0
):
"""
Add an equipment unit to the network.
Args:
name: Unit name
capacity: Unit capacity multiplier
unit_cost: Operating cost per time unit
availability: Unit availability (0-1)
"""
self.units[name] = {
'capacity': capacity,
'unit_cost': unit_cost,
'availability': availability
}
logger.info(f"Added unit '{name}' to STN")
[docs]
def optimize_schedule(
self,
time_horizon: int,
objective: str = 'profit',
demand: Optional[Dict[str, float]] = None,
method: str = 'greedy'
) -> Dict[str, Any]:
"""
Optimize production schedule.
Args:
time_horizon: Scheduling horizon [time units]
objective: Optimization objective ('profit', 'production', 'makespan')
demand: Product demand requirements
method: Solution method ('greedy', 'milp')
Returns:
Optimized schedule
"""
if method == 'greedy':
return self._greedy_scheduling(time_horizon, objective, demand)
elif method == 'milp':
return self._milp_scheduling(time_horizon, objective, demand)
else:
raise ValueError(f"Unknown scheduling method: {method}")
def _greedy_scheduling(
self,
time_horizon: int,
objective: str,
demand: Optional[Dict[str, float]]
) -> Dict[str, Any]:
"""
Greedy heuristic scheduling algorithm.
Args:
time_horizon: Time horizon
objective: Objective function
demand: Demand requirements
Returns:
Schedule solution
"""
schedule = {}
time_slots = list(range(time_horizon))
# Initialize unit schedules
for unit in self.units:
schedule[unit] = [None] * time_horizon
# Track material inventories
inventories = {state: info['initial_amount']
for state, info in self.states.items()}
# Track metrics
total_profit = 0.0
total_production = {state: 0.0 for state, info in self.states.items()
if info.get('is_product', False)}
# Task priority based on objective
tasks_by_priority = self._calculate_task_priorities(objective, demand)
# Greedy scheduling loop
for t in time_slots:
for task_name in tasks_by_priority:
task_info = self.tasks[task_name]
# Find available unit
available_unit = None
for unit in task_info['suitable_units']:
if unit in self.units and schedule[unit][t] is None:
# Check unit availability
if np.random.random() <= self.units[unit]['availability']:
available_unit = unit
break
if available_unit is None:
continue
# Check material availability
can_start = True
for input_state, amount in task_info['inputs'].items():
required_amount = amount * self.units[available_unit]['capacity']
if inventories.get(input_state, 0) < required_amount:
can_start = False
break
if not can_start:
continue
# Schedule task
duration = int(task_info['duration'])
end_time = min(t + duration, time_horizon)
# Reserve unit
for tt in range(t, end_time):
if tt < time_horizon:
schedule[available_unit][tt] = task_name
# Update inventories
unit_capacity = self.units[available_unit]['capacity']
# Consume inputs
for input_state, amount in task_info['inputs'].items():
inventories[input_state] -= amount * unit_capacity
# Produce outputs
for output_state, amount in task_info['outputs'].items():
produced = amount * unit_capacity
inventories[output_state] = inventories.get(output_state, 0) + produced
# Calculate profit/revenue
if output_state in self.states:
state_info = self.states[output_state]
revenue = produced * state_info['price']
total_profit += revenue
# Track production
if state_info.get('is_product', False):
total_production[output_state] += produced
# Subtract variable costs
total_profit -= task_info['variable_cost']
total_profit -= self.units[available_unit]['unit_cost'] * duration
# Break to next time slot (greedy: one task per time slot per unit)
break
# Calculate performance metrics
demand_satisfaction = {}
if demand:
for product, required in demand.items():
produced = total_production.get(product, 0)
satisfaction = min(produced / required, 1.0) if required > 0 else 1.0
demand_satisfaction[product] = satisfaction
self.schedule = {
'unit_schedules': schedule,
'final_inventories': inventories,
'total_profit': total_profit,
'total_production': total_production,
'demand_satisfaction': demand_satisfaction,
'time_horizon': time_horizon,
'method': 'greedy'
}
logger.info(f"Greedy scheduling completed: profit = {total_profit:.2f}")
return self.schedule
def _calculate_task_priorities(
self,
objective: str,
demand: Optional[Dict[str, float]]
) -> List[str]:
"""
Calculate task priorities based on objective.
Args:
objective: Objective function
demand: Demand requirements
Returns:
List of tasks ordered by priority (highest first)
"""
task_scores = {}
for task_name, task_info in self.tasks.items():
score = 0.0
if objective == 'profit':
# Calculate profit potential
revenue = 0.0
cost = task_info['variable_cost']
for output_state, amount in task_info['outputs'].items():
if output_state in self.states:
revenue += amount * self.states[output_state]['price']
score = revenue - cost
elif objective == 'production':
# Prioritize tasks that produce final products
for output_state, amount in task_info['outputs'].items():
if output_state in self.states and self.states[output_state].get('is_product', False):
score += amount
elif objective == 'makespan':
# Prioritize shorter tasks
score = -task_info['duration'] # Negative for shorter = better
# Apply demand weighting
if demand:
demand_weight = 1.0
for output_state, amount in task_info['outputs'].items():
if output_state in demand:
demand_weight = max(demand_weight, demand[output_state] / 10.0)
score *= demand_weight
task_scores[task_name] = score
# Sort by score (descending)
sorted_tasks = sorted(task_scores.keys(), key=lambda x: task_scores[x], reverse=True)
return sorted_tasks
def _milp_scheduling(
self,
time_horizon: int,
objective: str,
demand: Optional[Dict[str, float]]
) -> Dict[str, Any]:
"""
Mixed-Integer Linear Programming scheduling (simplified implementation).
Args:
time_horizon: Time horizon
objective: Objective function
demand: Demand requirements
Returns:
Schedule solution
"""
# This is a placeholder for MILP implementation
# In practice, this would use specialized MILP solvers like Gurobi or CPLEX
logger.warning("MILP scheduling not fully implemented, falling back to greedy")
return self._greedy_scheduling(time_horizon, objective, demand)
[docs]
def plot_schedule(
self,
figsize: Tuple[int, int] = (12, 6),
show_inventories: bool = True
):
"""
Plot Gantt chart of the schedule.
Args:
figsize: Figure size
show_inventories: Whether to show inventory plots
"""
if not self.schedule:
logger.error("No schedule to plot")
return
n_plots = 2 if show_inventories else 1
fig, axes = plt.subplots(n_plots, 1, figsize=(figsize[0], figsize[1] * n_plots))
if n_plots == 1:
axes = [axes]
# Gantt chart
ax = axes[0]
units = list(self.schedule['unit_schedules'].keys())
time_horizon = self.schedule['time_horizon']
# Color map for tasks
task_names = list(self.tasks.keys())
colors = plt.cm.Set3(np.linspace(0, 1, len(task_names)))
task_colors = dict(zip(task_names, colors))
# Create Gantt chart
for i, unit in enumerate(units):
schedule_unit = self.schedule['unit_schedules'][unit]
current_task = None
start_time = 0
for t, task in enumerate(schedule_unit):
if task != current_task:
if current_task is not None:
# Plot previous task
color = task_colors.get(current_task, 'gray')
ax.barh(i, t - start_time, left=start_time,
height=0.6, alpha=0.8, color=color,
label=current_task if current_task not in [t.get_text() for t in ax.get_legend().get_texts()] else "")
current_task = task
start_time = t
# Plot last task
if current_task is not None:
color = task_colors.get(current_task, 'gray')
ax.barh(i, len(schedule_unit) - start_time, left=start_time,
height=0.6, alpha=0.8, color=color,
label=current_task if current_task not in [t.get_text() for t in ax.get_legend().get_texts()] else "")
ax.set_yticks(range(len(units)))
ax.set_yticklabels(units)
ax.set_xlabel('Time')
ax.set_ylabel('Units')
ax.set_title(f'{self.name} - Production Schedule')
ax.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
ax.grid(True, alpha=0.3)
# Inventory plot
if show_inventories and n_plots > 1:
ax2 = axes[1]
# Simulate inventory evolution (simplified)
time_points = range(time_horizon + 1)
inventories = {state: [info['initial_amount']] for state, info in self.states.items()}
# Simple inventory tracking
for t in range(time_horizon):
# Copy previous inventories
for state in inventories:
inventories[state].append(inventories[state][-1])
# Plot inventories
for state, inventory_history in inventories.items():
if self.states[state].get('is_product', False) or state in ['FeedA', 'FeedB']: # Show key materials
ax2.plot(time_points, inventory_history, marker='o', label=state)
ax2.set_xlabel('Time')
ax2.set_ylabel('Inventory')
ax2.set_title('Material Inventories')
ax2.legend()
ax2.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
[docs]
def get_schedule_metrics(self) -> Dict[str, Any]:
"""
Calculate schedule performance metrics.
Returns:
Dictionary with performance metrics
"""
if not self.schedule:
return {}
# Basic metrics
metrics = {
'total_profit': self.schedule.get('total_profit', 0),
'total_production': self.schedule.get('total_production', {}),
'demand_satisfaction': self.schedule.get('demand_satisfaction', {}),
'time_horizon': self.schedule.get('time_horizon', 0)
}
# Unit utilization
unit_utilization = {}
for unit, schedule_unit in self.schedule['unit_schedules'].items():
busy_slots = sum(1 for slot in schedule_unit if slot is not None)
utilization = busy_slots / len(schedule_unit) if len(schedule_unit) > 0 else 0
unit_utilization[unit] = utilization
metrics['unit_utilization'] = unit_utilization
metrics['average_utilization'] = np.mean(list(unit_utilization.values())) if unit_utilization else 0
# Task distribution
task_counts = {}
for unit, schedule_unit in self.schedule['unit_schedules'].items():
for task in schedule_unit:
if task is not None:
task_counts[task] = task_counts.get(task, 0) + 1
metrics['task_distribution'] = task_counts
return metrics