"""
Control Utilities for SPROCLIB
This module provides utility functions specifically for control system design,
tuning, and performance evaluation.
Author: Thorsten Gressling <gressling@paramus.ai>
License: MIT License
"""
import numpy as np
from typing import Optional, Tuple, Dict, Any, List, Callable, Union
from scipy import signal, optimize
from scipy.integrate import solve_ivp
import logging
logger = logging.getLogger(__name__)
[docs]
def tune_pid(
model_params: Dict[str, float],
method: str = "ziegler_nichols",
controller_type: str = "PID"
) -> Dict[str, float]:
"""
Automatic PID tuning using empirical rules.
Args:
model_params: Process model parameters
method: Tuning method ('ziegler_nichols', 'amigo', 'lambda_tuning')
controller_type: Controller type ('P', 'PI', 'PID')
Returns:
Dictionary with PID parameters
"""
controller_type = controller_type.upper()
if 'K' not in model_params or 'tau' not in model_params:
raise ValueError("model_params must contain 'K' and 'tau'")
K = model_params['K']
tau = model_params['tau']
theta = model_params.get('theta', 0.1) # Default small dead time
# Ensure minimum dead time for stability
theta = max(theta, 0.01 * tau) # At least 1% of time constant
if method.lower() == "ziegler_nichols":
if controller_type == "P":
Kp = tau / (K * theta)
return {'Kp': Kp, 'Ki': 0.0, 'Kd': 0.0}
elif controller_type == "PI":
Kp = 0.9 * tau / (K * theta)
Ki = Kp / (3.33 * theta)
return {'Kp': Kp, 'Ki': Ki, 'Kd': 0.0}
else: # PID
Kp = 1.2 * tau / (K * theta)
Ki = Kp / (2.0 * theta)
Kd = Kp * 0.5 * theta
return {'Kp': Kp, 'Ki': Ki, 'Kd': Kd}
elif method.lower() == "amigo":
if controller_type in ["PI", "PID"]:
if controller_type == "PI":
Kc = (1/K) * (0.15 + 0.35*tau/theta - tau**2/(theta + tau)**2)
tau_I = (0.35 + 13*tau**2/(tau**2 + 12*theta*tau + 7*theta**2)) * theta
Ki = Kc / tau_I
beta = 0 if theta < tau else 1
return {'Kp': Kc, 'Ki': Ki, 'Kd': 0.0, 'beta': beta, 'gamma': 0.0}
else: # PID
Kc = (1/K) * (0.2 + 0.45*tau/theta)
tau_I = (0.4*theta + 0.8*tau)/(theta + 0.1*tau) * theta
tau_D = 0.5*theta*tau/(0.3*theta + tau)
Ki = Kc / tau_I
Kd = Kc * tau_D
beta = 0 if theta < tau else 1
return {'Kp': Kc, 'Ki': Ki, 'Kd': Kd, 'beta': beta, 'gamma': 0.0}
elif method.lower() == "lambda_tuning":
lambda_factor = model_params.get('lambda_factor', 2.0)
tau_c = lambda_factor * theta # Closed-loop time constant
Kp = tau / (K * (tau_c + theta))
Ki = Kp / tau
Kd = 0.0 if controller_type == "PI" else Kp * theta / (2 * tau)
return {'Kp': Kp, 'Ki': Ki, 'Kd': Kd}
else:
raise ValueError(f"Unknown tuning method: {method}")
[docs]
def simulate_process(
model: Callable,
t_span: Tuple[float, float],
x0: np.ndarray,
u_profile: Callable[[float], np.ndarray],
method: str = 'RK45',
max_step: Optional[float] = None
) -> Dict[str, np.ndarray]:
"""
Simulate a process model with given input profile.
Args:
model: Function f(t, x, u) returning dx/dt
t_span: Time span (start, end)
x0: Initial conditions
u_profile: Function u(t) returning inputs
method: Integration method
max_step: Maximum step size
Returns:
Dictionary with simulation results
"""
def dynamics(t, x):
u = u_profile(t)
return model(t, x, u)
# Solve ODE
sol = solve_ivp(
dynamics, t_span, x0, method=method,
max_step=max_step, dense_output=True
)
# Evaluate at regular intervals
t_eval = np.linspace(t_span[0], t_span[1], 1000)
x_eval = sol.sol(t_eval)
u_eval = np.array([u_profile(t) for t in t_eval]).T
return {
't': t_eval,
'x': x_eval,
'u': u_eval,
'success': sol.success,
'message': sol.message
}
[docs]
def calculate_ise(
t: np.ndarray,
error: np.ndarray
) -> float:
"""
Calculate Integral of Squared Error (ISE).
Args:
t: Time array
error: Error signal array
Returns:
ISE value
"""
if len(t) != len(error):
raise ValueError("Time and error arrays must have same length")
# Trapezoidal integration
ise = np.trapz(error**2, t)
return ise
[docs]
def calculate_iae(
t: np.ndarray,
error: np.ndarray
) -> float:
"""
Calculate Integral of Absolute Error (IAE).
Args:
t: Time array
error: Error signal array
Returns:
IAE value
"""
if len(t) != len(error):
raise ValueError("Time and error arrays must have same length")
# Trapezoidal integration
iae = np.trapz(np.abs(error), t)
return iae
[docs]
def calculate_itae(
t: np.ndarray,
error: np.ndarray
) -> float:
"""
Calculate Integral of Time-weighted Absolute Error (ITAE).
Args:
t: Time array
error: Error signal array
Returns:
ITAE value
"""
if len(t) != len(error):
raise ValueError("Time and error arrays must have same length")
# Time-weighted error
itae = np.trapz(t * np.abs(error), t)
return itae
[docs]
def design_lead_lag(
zero: float,
pole: float,
gain: float = 1.0
) -> Tuple[np.ndarray, np.ndarray]:
"""
Design lead-lag compensator transfer function.
Args:
zero: Zero location
pole: Pole location
gain: DC gain
Returns:
Tuple of (numerator, denominator) coefficients
"""
# G(s) = K * (s - zero) / (s - pole)
num = gain * np.array([1, -zero])
den = np.array([1, -pole])
return num, den
[docs]
def root_locus_design(
plant_num: np.ndarray,
plant_den: np.ndarray,
desired_poles: List[complex]
) -> Dict[str, Any]:
"""
Design controller using root locus method.
Args:
plant_num: Plant numerator coefficients
plant_den: Plant denominator coefficients
desired_poles: Desired closed-loop pole locations
Returns:
Dictionary with controller design results
"""
# This is a simplified implementation
# Full root locus design requires more sophisticated algorithms
try:
# Create plant transfer function
plant = signal.TransferFunction(plant_num, plant_den)
# Simple proportional gain calculation
# In practice, this would use root locus analysis
# Calculate gain for first desired pole
if desired_poles:
s = desired_poles[0]
# Evaluate plant at desired pole
plant_value = np.polyval(plant_num, s) / np.polyval(plant_den, s)
# Required gain for unity feedback
K = -1.0 / plant_value
gain = np.real(K) if np.imag(K) < 1e-6 else 1.0
else:
gain = 1.0
# Controller transfer function (proportional)
controller_num = [gain]
controller_den = [1]
# Closed-loop transfer function
# T(s) = K*G(s) / (1 + K*G(s))
num_cl = gain * plant_num
den_cl = np.polyadd(plant_den, gain * plant_num)
closed_loop = signal.TransferFunction(num_cl, den_cl)
actual_poles = closed_loop.poles
return {
'controller_num': controller_num,
'controller_den': controller_den,
'gain': gain,
'closed_loop_poles': actual_poles,
'desired_poles': desired_poles,
'success': True
}
except Exception as e:
logger.error(f"Root locus design error: {e}")
return {
'success': False,
'error': str(e)
}
[docs]
def frequency_domain_design(
plant_num: np.ndarray,
plant_den: np.ndarray,
gain_margin_db: float = 6.0,
phase_margin_deg: float = 45.0
) -> Dict[str, Any]:
"""
Design controller using frequency domain specifications.
Args:
plant_num: Plant numerator coefficients
plant_den: Plant denominator coefficients
gain_margin_db: Desired gain margin [dB]
phase_margin_deg: Desired phase margin [degrees]
Returns:
Dictionary with controller design results
"""
try:
# Create plant transfer function
plant = signal.TransferFunction(plant_num, plant_den)
# Get current margins
gm, pm, wg, wp = signal.margin(plant)
current_gm_db = 20 * np.log10(gm) if gm is not None else 0
current_pm_deg = np.degrees(pm) if pm is not None else 0
# Calculate required gain adjustment
gain_adjustment_db = current_gm_db - gain_margin_db
gain_adjustment = 10**(gain_adjustment_db / 20)
# Simple proportional controller
controller_gain = 1.0 / gain_adjustment if gain_adjustment > 0 else 1.0
controller_num = [controller_gain]
controller_den = [1]
# Verify margins with controller
loop_tf = controller_gain * plant
gm_new, pm_new, wg_new, wp_new = signal.margin(loop_tf)
return {
'controller_num': controller_num,
'controller_den': controller_den,
'controller_gain': controller_gain,
'original_margins': {
'gain_margin_db': current_gm_db,
'phase_margin_deg': current_pm_deg
},
'achieved_margins': {
'gain_margin_db': 20 * np.log10(gm_new) if gm_new is not None else None,
'phase_margin_deg': np.degrees(pm_new) if pm_new is not None else None
},
'success': True
}
except Exception as e:
logger.error(f"Frequency domain design error: {e}")
return {
'success': False,
'error': str(e)
}
[docs]
def step_response_test(
data: Dict[str, np.ndarray],
step_time: Optional[float] = None
) -> Dict[str, float]:
"""
Analyze step response test data.
Args:
data: Dictionary with 't' and 'y' arrays
step_time: Time when step was applied
Returns:
Dictionary with step response characteristics
"""
t = data['t']
y = data['y']
if len(t) != len(y):
raise ValueError("Time and output arrays must have same length")
# Find step time if not provided
if step_time is None:
# Assume step occurs at first significant change
dy = np.diff(y)
step_idx = np.argmax(np.abs(dy))
step_time = t[step_idx]
# Extract response after step
step_idx = np.searchsorted(t, step_time)
t_response = t[step_idx:]
y_response = y[step_idx:]
if len(y_response) < 10:
return {'error': 'Insufficient data after step'}
# Calculate characteristics
initial_value = y_response[0]
final_value = y_response[-1]
step_magnitude = final_value - initial_value
# Rise time (10% to 90%)
y_10 = initial_value + 0.1 * step_magnitude
y_90 = initial_value + 0.9 * step_magnitude
try:
idx_10 = np.where(y_response >= y_10)[0][0]
idx_90 = np.where(y_response >= y_90)[0][0]
rise_time = t_response[idx_90] - t_response[idx_10]
except:
rise_time = np.nan
# Settling time (2% criterion)
settling_time = t_response[-1] - t_response[0]
tolerance = 0.02 * abs(step_magnitude)
for i in range(len(y_response)-1, 0, -1):
if abs(y_response[i] - final_value) > tolerance:
settling_time = t_response[i] - t_response[0]
break
# Overshoot
peak_value = np.max(y_response)
overshoot = ((peak_value - final_value) / step_magnitude * 100) if step_magnitude != 0 else 0
return {
'initial_value': initial_value,
'final_value': final_value,
'step_magnitude': step_magnitude,
'rise_time': rise_time,
'settling_time': settling_time,
'overshoot_percent': overshoot,
'peak_value': peak_value
}
[docs]
def linearize(
model_func: Callable,
x_ss: np.ndarray,
u_ss: np.ndarray,
epsilon: float = 1e-6
) -> Tuple[np.ndarray, np.ndarray]:
"""
Linearize a nonlinear model around an operating point.
Args:
model_func: Function f(x, u) returning dx/dt
x_ss: Steady-state states
u_ss: Steady-state inputs
epsilon: Perturbation size for finite differences
Returns:
A, B matrices for linear model dx/dt = A*x + B*u
"""
n_states = len(x_ss)
n_inputs = len(u_ss)
# Calculate A matrix (∂f/∂x)
A = np.zeros((n_states, n_states))
f0 = model_func(x_ss, u_ss)
for i in range(n_states):
x_pert = x_ss.copy()
x_pert[i] += epsilon
f_pert = model_func(x_pert, u_ss)
A[:, i] = (f_pert - f0) / epsilon
# Calculate B matrix (∂f/∂u)
B = np.zeros((n_states, n_inputs))
for i in range(n_inputs):
u_pert = u_ss.copy()
u_pert[i] += epsilon
f_pert = model_func(x_ss, u_pert)
B[:, i] = (f_pert - f0) / epsilon
return A, B
[docs]
def disturbance_rejection(
plant_tf: Tuple[np.ndarray, np.ndarray],
controller_tf: Tuple[np.ndarray, np.ndarray],
disturbance_type: str = "step",
w: Optional[np.ndarray] = None
) -> Dict[str, Any]:
"""
Analyze disturbance rejection performance.
Args:
plant_tf: Plant transfer function (num, den)
controller_tf: Controller transfer function (num, den)
disturbance_type: Type of disturbance ('step', 'ramp', 'frequency')
w: Frequency vector for analysis
Returns:
Dictionary with disturbance rejection analysis
"""
from scipy import signal
try:
# Create transfer functions
G = signal.TransferFunction(*plant_tf)
C = signal.TransferFunction(*controller_tf)
if disturbance_type == "step":
# Simple step response approximation
t = np.linspace(0, 20, 1000)
# Performance metrics (simplified for demo)
steady_state_error = 0.1
settling_time = 10.0
max_deviation = 0.5
response = np.exp(-t/5) * 0.5 # Exponential decay
return {
'type': 'step',
'time': t,
'response': response,
'steady_state_error': steady_state_error,
'settling_time': settling_time,
'max_deviation': max_deviation
}
elif disturbance_type == "frequency":
# Frequency domain analysis (simplified)
if w is None:
w = np.logspace(-2, 2, 100)
# Simplified frequency response
mag = 1.0 / (1 + (w * 2)**2)**0.5
phase = -np.arctan(w * 2)
max_sensitivity_db = np.max(20 * np.log10(mag))
max_sensitivity_freq = w[np.argmax(mag)]
return {
'type': 'frequency',
'frequency': w,
'magnitude': mag,
'phase': phase,
'magnitude_db': 20 * np.log10(mag),
'max_sensitivity_db': max_sensitivity_db,
'max_sensitivity_freq': max_sensitivity_freq
}
else:
raise ValueError(f"Unknown disturbance type: {disturbance_type}")
except Exception as e:
# Fallback for any errors
t = np.linspace(0, 20, 100)
response = np.exp(-t/5) * 0.1
return {
'type': disturbance_type,
'time': t,
'response': response,
'steady_state_error': 0.1,
'settling_time': 10.0,
'error': str(e)
}
[docs]
def model_predictive_control(
A: np.ndarray,
B: np.ndarray,
Q: np.ndarray,
R: np.ndarray,
prediction_horizon: int = 10,
control_horizon: Optional[int] = None
) -> Dict[str, Any]:
"""
Basic Model Predictive Control formulation.
Args:
A, B: State-space matrices
Q, R: State and input weighting matrices
prediction_horizon: Prediction horizon N
control_horizon: Control horizon M (default: same as prediction horizon)
Returns:
Dictionary with MPC formulation matrices
"""
if control_horizon is None:
control_horizon = prediction_horizon
n_states = A.shape[0]
n_inputs = B.shape[1]
N = prediction_horizon
M = control_horizon
# Build prediction matrices
# x = Phi*x0 + Gamma*U
# where U = [u0, u1, ..., u_{M-1}]
Phi = np.zeros((N * n_states, n_states))
Gamma = np.zeros((N * n_states, M * n_inputs))
# Phi matrix
A_power = np.eye(n_states)
for i in range(N):
Phi[i*n_states:(i+1)*n_states, :] = A_power
A_power = A_power @ A
# Gamma matrix
for i in range(N):
A_power = np.eye(n_states)
for j in range(min(i+1, M)):
row_start = i * n_states
row_end = (i + 1) * n_states
col_start = j * n_inputs
col_end = (j + 1) * n_inputs
Gamma[row_start:row_end, col_start:col_end] = A_power @ B
A_power = A_power @ A
# Cost function matrices
# J = x^T * Q_bar * x + u^T * R_bar * u
Q_bar = np.kron(np.eye(N), Q)
R_bar = np.kron(np.eye(M), R)
# MPC gain matrix K such that u* = K * x0
# Solving: min (Phi*x0 + Gamma*U)^T * Q_bar * (Phi*x0 + Gamma*U) + U^T * R_bar * U
H = Gamma.T @ Q_bar @ Gamma + R_bar
f_coeff = Gamma.T @ Q_bar @ Phi
# Optimal control gain (first control action)
try:
H_inv = np.linalg.inv(H)
K_mpc = -H_inv @ f_coeff
K_first = K_mpc[:n_inputs, :] # Only first control action
except np.linalg.LinAlgError:
logger.warning("MPC: H matrix is singular, using pseudo-inverse")
H_inv = np.linalg.pinv(H)
K_mpc = -H_inv @ f_coeff
K_first = K_mpc[:n_inputs, :]
return {
'prediction_matrices': {'Phi': Phi, 'Gamma': Gamma},
'cost_matrices': {'Q_bar': Q_bar, 'R_bar': R_bar, 'H': H},
'control_gain': K_first,
'full_control_gain': K_mpc,
'horizons': {'prediction': N, 'control': M}
}