Source code for sproclib.analysis.model_identification

"""
Model Identification for SPROCLIB

This module provides tools for system identification and model fitting
from experimental data or step response tests.

Author: Thorsten Gressling <gressling@paramus.ai>
License: MIT License
"""

import numpy as np
from typing import Optional, Tuple, Dict, Any, List
from scipy import optimize
import logging

logger = logging.getLogger(__name__)


[docs] class ModelIdentification: """System identification and model fitting tools."""
[docs] def __init__(self, name: str = "Model Identification"): """ Initialize model identification. Args: name: Identification name """ self.name = name self.results = {}
[docs] def fit_fopdt( self, t_data: np.ndarray, y_data: np.ndarray, step_magnitude: float = 1.0, initial_guess: Optional[Tuple[float, float, float]] = None ) -> Dict[str, Any]: """ Fit First Order Plus Dead Time model to step response data. Args: t_data: Time data y_data: Response data step_magnitude: Magnitude of step input initial_guess: Initial parameter guess (K, tau, theta) Returns: Dictionary with fitted parameters and metrics """ def fopdt_response(t, K, tau, theta): """FOPDT step response""" response = np.zeros_like(t) mask = t >= theta response[mask] = K * (1 - np.exp(-(t[mask] - theta) / tau)) return response def objective(params): K, tau, theta = params y_pred = fopdt_response(t_data, K, tau, theta) * step_magnitude return np.sum((y_data - y_pred) ** 2) # Initial guess if initial_guess is None: K_guess = np.max(y_data) / step_magnitude tau_guess = t_data[-1] / 5 # Rough estimate theta_guess = 0.1 initial_guess = (K_guess, tau_guess, theta_guess) # Optimization bounds bounds = [(0.1, 10 * initial_guess[0]), # K (0.01, 10 * initial_guess[1]), # tau (0, t_data[-1] / 2)] # theta try: result = optimize.minimize( objective, initial_guess, bounds=bounds, method='L-BFGS-B' ) if result.success: K_fit, tau_fit, theta_fit = result.x y_fit = fopdt_response(t_data, K_fit, tau_fit, theta_fit) * step_magnitude # Calculate R-squared ss_res = np.sum((y_data - y_fit) ** 2) ss_tot = np.sum((y_data - np.mean(y_data)) ** 2) r_squared = 1 - (ss_res / ss_tot) if ss_tot > 0 else 0 logger.info(f"FOPDT fit successful: K={K_fit:.3f}, τ={tau_fit:.3f}, θ={theta_fit:.3f}") self.results = { 'K': K_fit, 'tau': tau_fit, 'theta': theta_fit, 'r_squared': r_squared, 'fitted_response': y_fit, 'residuals': y_data - y_fit, 'success': True } else: logger.error(f"FOPDT fit failed: {result.message}") self.results = { 'K': np.nan, 'tau': np.nan, 'theta': np.nan, 'r_squared': 0, 'success': False, 'message': result.message } except Exception as e: logger.error(f"FOPDT fit error: {e}") self.results = { 'K': np.nan, 'tau': np.nan, 'theta': np.nan, 'r_squared': 0, 'success': False, 'error': str(e) } return self.results
[docs] def fit_sopdt( self, t_data: np.ndarray, y_data: np.ndarray, step_magnitude: float = 1.0, initial_guess: Optional[Tuple[float, float, float, float]] = None ) -> Dict[str, Any]: """ Fit Second Order Plus Dead Time model to step response data. Args: t_data: Time data y_data: Response data step_magnitude: Magnitude of step input initial_guess: Initial parameter guess (K, tau1, tau2, theta) Returns: Dictionary with fitted parameters and metrics """ def sopdt_response(t, K, tau1, tau2, theta): """SOPDT step response""" response = np.zeros_like(t) mask = t >= theta t_active = t[mask] - theta if abs(tau1 - tau2) < 1e-6: # Repeated roots case tau = tau1 response[mask] = K * (1 - (1 + t_active/tau) * np.exp(-t_active/tau)) else: # Distinct roots case a1 = tau1 / (tau1 - tau2) a2 = tau2 / (tau2 - tau1) exp1 = np.exp(-t_active / tau1) exp2 = np.exp(-t_active / tau2) response[mask] = K * (1 + a1 * exp1 + a2 * exp2) return response def objective(params): K, tau1, tau2, theta = params y_pred = sopdt_response(t_data, K, tau1, tau2, theta) * step_magnitude return np.sum((y_data - y_pred) ** 2) # Initial guess if initial_guess is None: K_guess = np.max(y_data) / step_magnitude tau1_guess = t_data[-1] / 8 tau2_guess = t_data[-1] / 4 theta_guess = 0.1 initial_guess = (K_guess, tau1_guess, tau2_guess, theta_guess) # Optimization bounds bounds = [(0.1, 10 * initial_guess[0]), # K (0.01, 10 * initial_guess[1]), # tau1 (0.01, 10 * initial_guess[2]), # tau2 (0, t_data[-1] / 2)] # theta try: result = optimize.minimize( objective, initial_guess, bounds=bounds, method='L-BFGS-B' ) if result.success: K_fit, tau1_fit, tau2_fit, theta_fit = result.x y_fit = sopdt_response(t_data, K_fit, tau1_fit, tau2_fit, theta_fit) * step_magnitude # Calculate R-squared ss_res = np.sum((y_data - y_fit) ** 2) ss_tot = np.sum((y_data - np.mean(y_data)) ** 2) r_squared = 1 - (ss_res / ss_tot) if ss_tot > 0 else 0 logger.info(f"SOPDT fit successful: K={K_fit:.3f}, τ1={tau1_fit:.3f}, τ2={tau2_fit:.3f}, θ={theta_fit:.3f}") return { 'K': K_fit, 'tau1': tau1_fit, 'tau2': tau2_fit, 'theta': theta_fit, 'r_squared': r_squared, 'fitted_response': y_fit, 'residuals': y_data - y_fit, 'success': True } else: logger.error(f"SOPDT fit failed: {result.message}") return { 'K': np.nan, 'tau1': np.nan, 'tau2': np.nan, 'theta': np.nan, 'r_squared': 0, 'success': False, 'message': result.message } except Exception as e: logger.error(f"SOPDT fit error: {e}") return { 'K': np.nan, 'tau1': np.nan, 'tau2': np.nan, 'theta': np.nan, 'r_squared': 0, 'success': False, 'error': str(e) }
[docs] def fit_transfer_function( self, freq_data: np.ndarray, magnitude_data: np.ndarray, phase_data: np.ndarray, model_order: Tuple[int, int] = (2, 2) ) -> Dict[str, Any]: """ Fit transfer function to frequency response data. Args: freq_data: Frequency data [rad/s] magnitude_data: Magnitude data (linear scale) phase_data: Phase data [radians] model_order: Transfer function order (numerator, denominator) Returns: Dictionary with fitted transfer function """ from scipy import signal # Convert to complex frequency response H_data = magnitude_data * np.exp(1j * phase_data) # Use vector fitting or least squares approach # This is a simplified implementation - in practice, use specialized tools num_order, den_order = model_order try: # Simplified fitting using least squares # Create complex frequency points s_data = 1j * freq_data # Set up least squares problem (simplified) # In practice, this requires more sophisticated vector fitting algorithms # For now, return a simple first-order fit if len(freq_data) > 2: # Simple magnitude-based fitting low_freq_gain = magnitude_data[0] if len(magnitude_data) > 0 else 1.0 # Estimate time constant from -3dB frequency mag_db = 20 * np.log10(magnitude_data + 1e-12) try: idx_3db = np.where(mag_db <= (20*np.log10(low_freq_gain) - 3))[0] if len(idx_3db) > 0: freq_3db = freq_data[idx_3db[0]] tau_est = 1.0 / freq_3db else: tau_est = 1.0 except: tau_est = 1.0 # Create simple first-order model num = [low_freq_gain] den = [tau_est, 1] tf_fit = signal.TransferFunction(num, den) # Calculate fit quality _, mag_fit, phase_fit = signal.bode(tf_fit, freq_data) mag_error = np.mean((magnitude_data - mag_fit)**2) phase_error = np.mean((phase_data - phase_fit)**2) return { 'numerator': num, 'denominator': den, 'transfer_function': tf_fit, 'magnitude_error': mag_error, 'phase_error': phase_error, 'success': True } else: return { 'success': False, 'error': 'Insufficient data points' } except Exception as e: logger.error(f"Transfer function fitting error: {e}") return { 'success': False, 'error': str(e) }
[docs] def validate_model( self, model_params: Dict[str, float], validation_data: Dict[str, np.ndarray], model_type: str = "fopdt" ) -> Dict[str, float]: """ Validate fitted model against independent validation data. Args: model_params: Fitted model parameters validation_data: Dictionary with 't' and 'y' arrays model_type: Type of model ('fopdt', 'sopdt') Returns: Dictionary with validation metrics """ t_val = validation_data['t'] y_val = validation_data['y'] if model_type.lower() == "fopdt": K = model_params['K'] tau = model_params['tau'] theta = model_params['theta'] # Generate model prediction y_pred = np.zeros_like(t_val) mask = t_val >= theta t_active = t_val[mask] - theta y_pred[mask] = K * (1 - np.exp(-t_active / tau)) elif model_type.lower() == "sopdt": K = model_params['K'] tau1 = model_params['tau1'] tau2 = model_params['tau2'] theta = model_params['theta'] # Generate model prediction y_pred = np.zeros_like(t_val) mask = t_val >= theta t_active = t_val[mask] - theta if abs(tau1 - tau2) < 1e-6: # Repeated roots tau = tau1 y_pred[mask] = K * (1 - (1 + t_active/tau) * np.exp(-t_active/tau)) else: # Distinct roots a1 = tau1 / (tau1 - tau2) a2 = tau2 / (tau2 - tau1) exp1 = np.exp(-t_active / tau1) exp2 = np.exp(-t_active / tau2) y_pred[mask] = K * (1 + a1 * exp1 + a2 * exp2) else: raise ValueError(f"Unknown model type: {model_type}") # Calculate validation metrics mse = np.mean((y_val - y_pred)**2) rmse = np.sqrt(mse) mae = np.mean(np.abs(y_val - y_pred)) # R-squared ss_res = np.sum((y_val - y_pred)**2) ss_tot = np.sum((y_val - np.mean(y_val))**2) r_squared = 1 - (ss_res / ss_tot) if ss_tot > 0 else 0 return { 'mse': mse, 'rmse': rmse, 'mae': mae, 'r_squared': r_squared, 'predicted_response': y_pred }
# Standalone function wrappers for backward compatibility
[docs] def fit_fopdt( t_data: np.ndarray, y_data: np.ndarray, step_magnitude: float = 1.0, initial_guess: Optional[Tuple[float, float, float]] = None ) -> Dict[str, float]: """ Fit First Order Plus Dead Time model to step response data. Args: t_data: Time data y_data: Response data step_magnitude: Magnitude of step input initial_guess: Initial parameter guess (K, tau, theta) Returns: Dictionary with fitted parameters and metrics """ identifier = ModelIdentification() return identifier.fit_fopdt(t_data, y_data, step_magnitude, initial_guess)