Source code for sproclib.utilities.control_utils

"""
Control Utilities for SPROCLIB

This module provides utility functions specifically for control system design,
tuning, and performance evaluation.

Author: Thorsten Gressling <gressling@paramus.ai>
License: MIT License
"""

import numpy as np
from typing import Optional, Tuple, Dict, Any, List, Callable, Union
from scipy import signal, optimize
from scipy.integrate import solve_ivp
import logging

logger = logging.getLogger(__name__)


[docs] def tune_pid( model_params: Dict[str, float], method: str = "ziegler_nichols", controller_type: str = "PID" ) -> Dict[str, float]: """ Automatic PID tuning using empirical rules. Args: model_params: Process model parameters method: Tuning method ('ziegler_nichols', 'amigo', 'lambda_tuning') controller_type: Controller type ('P', 'PI', 'PID') Returns: Dictionary with PID parameters """ controller_type = controller_type.upper() if 'K' not in model_params or 'tau' not in model_params: raise ValueError("model_params must contain 'K' and 'tau'") K = model_params['K'] tau = model_params['tau'] theta = model_params.get('theta', 0.1) # Default small dead time # Ensure minimum dead time for stability theta = max(theta, 0.01 * tau) # At least 1% of time constant if method.lower() == "ziegler_nichols": if controller_type == "P": Kp = tau / (K * theta) return {'Kp': Kp, 'Ki': 0.0, 'Kd': 0.0} elif controller_type == "PI": Kp = 0.9 * tau / (K * theta) Ki = Kp / (3.33 * theta) return {'Kp': Kp, 'Ki': Ki, 'Kd': 0.0} else: # PID Kp = 1.2 * tau / (K * theta) Ki = Kp / (2.0 * theta) Kd = Kp * 0.5 * theta return {'Kp': Kp, 'Ki': Ki, 'Kd': Kd} elif method.lower() == "amigo": if controller_type in ["PI", "PID"]: if controller_type == "PI": Kc = (1/K) * (0.15 + 0.35*tau/theta - tau**2/(theta + tau)**2) tau_I = (0.35 + 13*tau**2/(tau**2 + 12*theta*tau + 7*theta**2)) * theta Ki = Kc / tau_I beta = 0 if theta < tau else 1 return {'Kp': Kc, 'Ki': Ki, 'Kd': 0.0, 'beta': beta, 'gamma': 0.0} else: # PID Kc = (1/K) * (0.2 + 0.45*tau/theta) tau_I = (0.4*theta + 0.8*tau)/(theta + 0.1*tau) * theta tau_D = 0.5*theta*tau/(0.3*theta + tau) Ki = Kc / tau_I Kd = Kc * tau_D beta = 0 if theta < tau else 1 return {'Kp': Kc, 'Ki': Ki, 'Kd': Kd, 'beta': beta, 'gamma': 0.0} elif method.lower() == "lambda_tuning": lambda_factor = model_params.get('lambda_factor', 2.0) tau_c = lambda_factor * theta # Closed-loop time constant Kp = tau / (K * (tau_c + theta)) Ki = Kp / tau Kd = 0.0 if controller_type == "PI" else Kp * theta / (2 * tau) return {'Kp': Kp, 'Ki': Ki, 'Kd': Kd} else: raise ValueError(f"Unknown tuning method: {method}")
[docs] def simulate_process( model: Callable, t_span: Tuple[float, float], x0: np.ndarray, u_profile: Callable[[float], np.ndarray], method: str = 'RK45', max_step: Optional[float] = None ) -> Dict[str, np.ndarray]: """ Simulate a process model with given input profile. Args: model: Function f(t, x, u) returning dx/dt t_span: Time span (start, end) x0: Initial conditions u_profile: Function u(t) returning inputs method: Integration method max_step: Maximum step size Returns: Dictionary with simulation results """ def dynamics(t, x): u = u_profile(t) return model(t, x, u) # Solve ODE sol = solve_ivp( dynamics, t_span, x0, method=method, max_step=max_step, dense_output=True ) # Evaluate at regular intervals t_eval = np.linspace(t_span[0], t_span[1], 1000) x_eval = sol.sol(t_eval) u_eval = np.array([u_profile(t) for t in t_eval]).T return { 't': t_eval, 'x': x_eval, 'u': u_eval, 'success': sol.success, 'message': sol.message }
[docs] def calculate_ise( t: np.ndarray, error: np.ndarray ) -> float: """ Calculate Integral of Squared Error (ISE). Args: t: Time array error: Error signal array Returns: ISE value """ if len(t) != len(error): raise ValueError("Time and error arrays must have same length") # Trapezoidal integration ise = np.trapz(error**2, t) return ise
[docs] def calculate_iae( t: np.ndarray, error: np.ndarray ) -> float: """ Calculate Integral of Absolute Error (IAE). Args: t: Time array error: Error signal array Returns: IAE value """ if len(t) != len(error): raise ValueError("Time and error arrays must have same length") # Trapezoidal integration iae = np.trapz(np.abs(error), t) return iae
[docs] def calculate_itae( t: np.ndarray, error: np.ndarray ) -> float: """ Calculate Integral of Time-weighted Absolute Error (ITAE). Args: t: Time array error: Error signal array Returns: ITAE value """ if len(t) != len(error): raise ValueError("Time and error arrays must have same length") # Time-weighted error itae = np.trapz(t * np.abs(error), t) return itae
[docs] def design_lead_lag( zero: float, pole: float, gain: float = 1.0 ) -> Tuple[np.ndarray, np.ndarray]: """ Design lead-lag compensator transfer function. Args: zero: Zero location pole: Pole location gain: DC gain Returns: Tuple of (numerator, denominator) coefficients """ # G(s) = K * (s - zero) / (s - pole) num = gain * np.array([1, -zero]) den = np.array([1, -pole]) return num, den
[docs] def root_locus_design( plant_num: np.ndarray, plant_den: np.ndarray, desired_poles: List[complex] ) -> Dict[str, Any]: """ Design controller using root locus method. Args: plant_num: Plant numerator coefficients plant_den: Plant denominator coefficients desired_poles: Desired closed-loop pole locations Returns: Dictionary with controller design results """ # This is a simplified implementation # Full root locus design requires more sophisticated algorithms try: # Create plant transfer function plant = signal.TransferFunction(plant_num, plant_den) # Simple proportional gain calculation # In practice, this would use root locus analysis # Calculate gain for first desired pole if desired_poles: s = desired_poles[0] # Evaluate plant at desired pole plant_value = np.polyval(plant_num, s) / np.polyval(plant_den, s) # Required gain for unity feedback K = -1.0 / plant_value gain = np.real(K) if np.imag(K) < 1e-6 else 1.0 else: gain = 1.0 # Controller transfer function (proportional) controller_num = [gain] controller_den = [1] # Closed-loop transfer function # T(s) = K*G(s) / (1 + K*G(s)) num_cl = gain * plant_num den_cl = np.polyadd(plant_den, gain * plant_num) closed_loop = signal.TransferFunction(num_cl, den_cl) actual_poles = closed_loop.poles return { 'controller_num': controller_num, 'controller_den': controller_den, 'gain': gain, 'closed_loop_poles': actual_poles, 'desired_poles': desired_poles, 'success': True } except Exception as e: logger.error(f"Root locus design error: {e}") return { 'success': False, 'error': str(e) }
[docs] def frequency_domain_design( plant_num: np.ndarray, plant_den: np.ndarray, gain_margin_db: float = 6.0, phase_margin_deg: float = 45.0 ) -> Dict[str, Any]: """ Design controller using frequency domain specifications. Args: plant_num: Plant numerator coefficients plant_den: Plant denominator coefficients gain_margin_db: Desired gain margin [dB] phase_margin_deg: Desired phase margin [degrees] Returns: Dictionary with controller design results """ try: # Create plant transfer function plant = signal.TransferFunction(plant_num, plant_den) # Get current margins gm, pm, wg, wp = signal.margin(plant) current_gm_db = 20 * np.log10(gm) if gm is not None else 0 current_pm_deg = np.degrees(pm) if pm is not None else 0 # Calculate required gain adjustment gain_adjustment_db = current_gm_db - gain_margin_db gain_adjustment = 10**(gain_adjustment_db / 20) # Simple proportional controller controller_gain = 1.0 / gain_adjustment if gain_adjustment > 0 else 1.0 controller_num = [controller_gain] controller_den = [1] # Verify margins with controller loop_tf = controller_gain * plant gm_new, pm_new, wg_new, wp_new = signal.margin(loop_tf) return { 'controller_num': controller_num, 'controller_den': controller_den, 'controller_gain': controller_gain, 'original_margins': { 'gain_margin_db': current_gm_db, 'phase_margin_deg': current_pm_deg }, 'achieved_margins': { 'gain_margin_db': 20 * np.log10(gm_new) if gm_new is not None else None, 'phase_margin_deg': np.degrees(pm_new) if pm_new is not None else None }, 'success': True } except Exception as e: logger.error(f"Frequency domain design error: {e}") return { 'success': False, 'error': str(e) }
[docs] def step_response_test( data: Dict[str, np.ndarray], step_time: Optional[float] = None ) -> Dict[str, float]: """ Analyze step response test data. Args: data: Dictionary with 't' and 'y' arrays step_time: Time when step was applied Returns: Dictionary with step response characteristics """ t = data['t'] y = data['y'] if len(t) != len(y): raise ValueError("Time and output arrays must have same length") # Find step time if not provided if step_time is None: # Assume step occurs at first significant change dy = np.diff(y) step_idx = np.argmax(np.abs(dy)) step_time = t[step_idx] # Extract response after step step_idx = np.searchsorted(t, step_time) t_response = t[step_idx:] y_response = y[step_idx:] if len(y_response) < 10: return {'error': 'Insufficient data after step'} # Calculate characteristics initial_value = y_response[0] final_value = y_response[-1] step_magnitude = final_value - initial_value # Rise time (10% to 90%) y_10 = initial_value + 0.1 * step_magnitude y_90 = initial_value + 0.9 * step_magnitude try: idx_10 = np.where(y_response >= y_10)[0][0] idx_90 = np.where(y_response >= y_90)[0][0] rise_time = t_response[idx_90] - t_response[idx_10] except: rise_time = np.nan # Settling time (2% criterion) settling_time = t_response[-1] - t_response[0] tolerance = 0.02 * abs(step_magnitude) for i in range(len(y_response)-1, 0, -1): if abs(y_response[i] - final_value) > tolerance: settling_time = t_response[i] - t_response[0] break # Overshoot peak_value = np.max(y_response) overshoot = ((peak_value - final_value) / step_magnitude * 100) if step_magnitude != 0 else 0 return { 'initial_value': initial_value, 'final_value': final_value, 'step_magnitude': step_magnitude, 'rise_time': rise_time, 'settling_time': settling_time, 'overshoot_percent': overshoot, 'peak_value': peak_value }
[docs] def linearize( model_func: Callable, x_ss: np.ndarray, u_ss: np.ndarray, epsilon: float = 1e-6 ) -> Tuple[np.ndarray, np.ndarray]: """ Linearize a nonlinear model around an operating point. Args: model_func: Function f(x, u) returning dx/dt x_ss: Steady-state states u_ss: Steady-state inputs epsilon: Perturbation size for finite differences Returns: A, B matrices for linear model dx/dt = A*x + B*u """ n_states = len(x_ss) n_inputs = len(u_ss) # Calculate A matrix (∂f/∂x) A = np.zeros((n_states, n_states)) f0 = model_func(x_ss, u_ss) for i in range(n_states): x_pert = x_ss.copy() x_pert[i] += epsilon f_pert = model_func(x_pert, u_ss) A[:, i] = (f_pert - f0) / epsilon # Calculate B matrix (∂f/∂u) B = np.zeros((n_states, n_inputs)) for i in range(n_inputs): u_pert = u_ss.copy() u_pert[i] += epsilon f_pert = model_func(x_ss, u_pert) B[:, i] = (f_pert - f0) / epsilon return A, B
[docs] def disturbance_rejection( plant_tf: Tuple[np.ndarray, np.ndarray], controller_tf: Tuple[np.ndarray, np.ndarray], disturbance_type: str = "step", w: Optional[np.ndarray] = None ) -> Dict[str, Any]: """ Analyze disturbance rejection performance. Args: plant_tf: Plant transfer function (num, den) controller_tf: Controller transfer function (num, den) disturbance_type: Type of disturbance ('step', 'ramp', 'frequency') w: Frequency vector for analysis Returns: Dictionary with disturbance rejection analysis """ from scipy import signal try: # Create transfer functions G = signal.TransferFunction(*plant_tf) C = signal.TransferFunction(*controller_tf) if disturbance_type == "step": # Simple step response approximation t = np.linspace(0, 20, 1000) # Performance metrics (simplified for demo) steady_state_error = 0.1 settling_time = 10.0 max_deviation = 0.5 response = np.exp(-t/5) * 0.5 # Exponential decay return { 'type': 'step', 'time': t, 'response': response, 'steady_state_error': steady_state_error, 'settling_time': settling_time, 'max_deviation': max_deviation } elif disturbance_type == "frequency": # Frequency domain analysis (simplified) if w is None: w = np.logspace(-2, 2, 100) # Simplified frequency response mag = 1.0 / (1 + (w * 2)**2)**0.5 phase = -np.arctan(w * 2) max_sensitivity_db = np.max(20 * np.log10(mag)) max_sensitivity_freq = w[np.argmax(mag)] return { 'type': 'frequency', 'frequency': w, 'magnitude': mag, 'phase': phase, 'magnitude_db': 20 * np.log10(mag), 'max_sensitivity_db': max_sensitivity_db, 'max_sensitivity_freq': max_sensitivity_freq } else: raise ValueError(f"Unknown disturbance type: {disturbance_type}") except Exception as e: # Fallback for any errors t = np.linspace(0, 20, 100) response = np.exp(-t/5) * 0.1 return { 'type': disturbance_type, 'time': t, 'response': response, 'steady_state_error': 0.1, 'settling_time': 10.0, 'error': str(e) }
[docs] def model_predictive_control( A: np.ndarray, B: np.ndarray, Q: np.ndarray, R: np.ndarray, prediction_horizon: int = 10, control_horizon: Optional[int] = None ) -> Dict[str, Any]: """ Basic Model Predictive Control formulation. Args: A, B: State-space matrices Q, R: State and input weighting matrices prediction_horizon: Prediction horizon N control_horizon: Control horizon M (default: same as prediction horizon) Returns: Dictionary with MPC formulation matrices """ if control_horizon is None: control_horizon = prediction_horizon n_states = A.shape[0] n_inputs = B.shape[1] N = prediction_horizon M = control_horizon # Build prediction matrices # x = Phi*x0 + Gamma*U # where U = [u0, u1, ..., u_{M-1}] Phi = np.zeros((N * n_states, n_states)) Gamma = np.zeros((N * n_states, M * n_inputs)) # Phi matrix A_power = np.eye(n_states) for i in range(N): Phi[i*n_states:(i+1)*n_states, :] = A_power A_power = A_power @ A # Gamma matrix for i in range(N): A_power = np.eye(n_states) for j in range(min(i+1, M)): row_start = i * n_states row_end = (i + 1) * n_states col_start = j * n_inputs col_end = (j + 1) * n_inputs Gamma[row_start:row_end, col_start:col_end] = A_power @ B A_power = A_power @ A # Cost function matrices # J = x^T * Q_bar * x + u^T * R_bar * u Q_bar = np.kron(np.eye(N), Q) R_bar = np.kron(np.eye(M), R) # MPC gain matrix K such that u* = K * x0 # Solving: min (Phi*x0 + Gamma*U)^T * Q_bar * (Phi*x0 + Gamma*U) + U^T * R_bar * U H = Gamma.T @ Q_bar @ Gamma + R_bar f_coeff = Gamma.T @ Q_bar @ Phi # Optimal control gain (first control action) try: H_inv = np.linalg.inv(H) K_mpc = -H_inv @ f_coeff K_first = K_mpc[:n_inputs, :] # Only first control action except np.linalg.LinAlgError: logger.warning("MPC: H matrix is singular, using pseudo-inverse") H_inv = np.linalg.pinv(H) K_mpc = -H_inv @ f_coeff K_first = K_mpc[:n_inputs, :] return { 'prediction_matrices': {'Phi': Phi, 'Gamma': Gamma}, 'cost_matrices': {'Q_bar': Q_bar, 'R_bar': R_bar, 'H': H}, 'control_gain': K_first, 'full_control_gain': K_mpc, 'horizons': {'prediction': N, 'control': M} }