Source code for sproclib.unit.valve.three_way.three_way_valve

"""
Three-Way Valve Model for SPROCLIB

This module provides a three-way valve model for flow
splitting/mixing applications with dead-time and flow coefficient calculations.

Author: Thorsten Gressling <gressling@paramus.ai>
License: MIT License
"""

import numpy as np
import logging
from typing import Tuple
from ...base import ProcessModel

logger = logging.getLogger(__name__)


[docs] class ThreeWayValve(ProcessModel): """ Three-way control valve model for flow splitting/mixing applications. Features: - Mixing and diverting configurations - Flow coefficient-based calculations - Dead-time in valve position - Flow distribution control """
[docs] def __init__(self, Cv_max: float = 100.0, valve_config: str = "mixing", # "mixing" or "diverting" dead_time: float = 0.5, time_constant: float = 2.0, name: str = "ThreeWayValve"): """ Initialize three-way valve model. Args: Cv_max: Maximum flow coefficient valve_config: "mixing" (two inlets, one outlet) or "diverting" (one inlet, two outlets) dead_time: Position dead-time (seconds) time_constant: Actuator time constant (seconds) name: Valve identifier """ super().__init__(name) self.Cv_max = Cv_max self.valve_config = valve_config self.dead_time = dead_time self.time_constant = time_constant # Dead-time buffer self.position_buffer = [] self.time_buffer = [] if valve_config == "mixing": # State: [position, flow_out] # Inputs: [position_command, P1_in, P2_in, P_out, rho] self.state_names = ['valve_position', 'flow_out'] elif valve_config == "diverting": # State: [position, flow_out1, flow_out2] # Inputs: [position_command, P_in, P1_out, P2_out, rho] self.state_names = ['valve_position', 'flow_out1', 'flow_out2'] else: raise ValueError("valve_config must be 'mixing' or 'diverting'") self.parameters.update({ 'Cv_max': Cv_max, 'valve_config': valve_config, 'dead_time': dead_time, 'time_constant': time_constant }) logger.info(f"Created 3-way valve {name}, config={valve_config}")
def _calculate_cv_split(self, position: float) -> Tuple[float, float]: """ Calculate flow coefficients for both paths based on position. Args: position: Valve position (0=fully path A, 1=fully path B) Returns: (Cv_A, Cv_B): Flow coefficients for paths A and B """ position = np.clip(position, 0.0, 1.0) # Linear characteristic for simplicity Cv_A = self.Cv_max * (1.0 - position) Cv_B = self.Cv_max * position return Cv_A, Cv_B def _update_dead_time_buffer(self, t: float, position_command: float): """Update dead-time buffer.""" self.time_buffer.append(t) self.position_buffer.append(position_command) while (self.time_buffer and self.time_buffer[0] < t - self.dead_time - 0.1): self.time_buffer.pop(0) self.position_buffer.pop(0) def _get_delayed_position(self, t: float) -> float: """Get delayed valve position.""" target_time = t - self.dead_time if not self.time_buffer or target_time < self.time_buffer[0]: return 0.0 if target_time >= self.time_buffer[-1]: return self.position_buffer[-1] for i in range(len(self.time_buffer) - 1): if (self.time_buffer[i] <= target_time <= self.time_buffer[i + 1]): t1, t2 = self.time_buffer[i], self.time_buffer[i + 1] p1, p2 = self.position_buffer[i], self.position_buffer[i + 1] return p1 + (p2 - p1) * (target_time - t1) / (t2 - t1) return 0.0
[docs] def dynamics(self, t: float, x: np.ndarray, u: np.ndarray) -> np.ndarray: """ Three-way valve dynamics. For mixing: u = [position_command, P1_in, P2_in, P_out, rho] For diverting: u = [position_command, P_in, P1_out, P2_out, rho] """ if self.valve_config == "mixing": return self._mixing_dynamics(t, x, u) else: return self._diverting_dynamics(t, x, u)
def _mixing_dynamics(self, t: float, x: np.ndarray, u: np.ndarray) -> np.ndarray: """Mixing valve dynamics.""" position, flow_out = x position_cmd, P1_in, P2_in, P_out, rho = u self._update_dead_time_buffer(t, position_cmd) delayed_cmd = self._get_delayed_position(t) # Position dynamics dpos_dt = (delayed_cmd - position) / self.time_constant # Flow calculation Cv_A, Cv_B = self._calculate_cv_split(position) # Calculate flows from both inlets Cv_si = 6.309e-5 # Conversion factor delta_P1 = max(0, P1_in - P_out) delta_P2 = max(0, P2_in - P_out) flow1 = Cv_A * Cv_si * np.sqrt(delta_P1 / rho) if delta_P1 > 0 else 0 flow2 = Cv_B * Cv_si * np.sqrt(delta_P2 / rho) if delta_P2 > 0 else 0 desired_flow_out = flow1 + flow2 # Flow response flow_tau = self.time_constant / 10.0 dflow_dt = (desired_flow_out - flow_out) / flow_tau return np.array([dpos_dt, dflow_dt]) def _diverting_dynamics(self, t: float, x: np.ndarray, u: np.ndarray) -> np.ndarray: """Diverting valve dynamics.""" position, flow_out1, flow_out2 = x position_cmd, P_in, P1_out, P2_out, rho = u self._update_dead_time_buffer(t, position_cmd) delayed_cmd = self._get_delayed_position(t) # Position dynamics dpos_dt = (delayed_cmd - position) / self.time_constant # Flow calculation Cv_A, Cv_B = self._calculate_cv_split(position) Cv_si = 6.309e-5 delta_P1 = max(0, P_in - P1_out) delta_P2 = max(0, P_in - P2_out) desired_flow1 = Cv_A * Cv_si * np.sqrt(delta_P1 / rho) if delta_P1 > 0 else 0 desired_flow2 = Cv_B * Cv_si * np.sqrt(delta_P2 / rho) if delta_P2 > 0 else 0 # Flow responses flow_tau = self.time_constant / 10.0 dflow1_dt = (desired_flow1 - flow_out1) / flow_tau dflow2_dt = (desired_flow2 - flow_out2) / flow_tau return np.array([dpos_dt, dflow1_dt, dflow2_dt])
[docs] def steady_state(self, u: np.ndarray) -> np.ndarray: """Calculate steady-state flows.""" if self.valve_config == "mixing": position_cmd, P1_in, P2_in, P_out, rho = u position = np.clip(position_cmd, 0.0, 1.0) Cv_A, Cv_B = self._calculate_cv_split(position) Cv_si = 6.309e-5 delta_P1 = max(0, P1_in - P_out) delta_P2 = max(0, P2_in - P_out) flow1 = Cv_A * Cv_si * np.sqrt(delta_P1 / rho) if delta_P1 > 0 else 0 flow2 = Cv_B * Cv_si * np.sqrt(delta_P2 / rho) if delta_P2 > 0 else 0 return np.array([position, flow1 + flow2]) else: # diverting position_cmd, P_in, P1_out, P2_out, rho = u position = np.clip(position_cmd, 0.0, 1.0) Cv_A, Cv_B = self._calculate_cv_split(position) Cv_si = 6.309e-5 delta_P1 = max(0, P_in - P1_out) delta_P2 = max(0, P_in - P2_out) flow1 = Cv_A * Cv_si * np.sqrt(delta_P1 / rho) if delta_P1 > 0 else 0 flow2 = Cv_B * Cv_si * np.sqrt(delta_P2 / rho) if delta_P2 > 0 else 0 return np.array([position, flow1, flow2])
[docs] def get_flow_split(self, position: float) -> Tuple[float, float]: """Get flow split percentages for given position.""" Cv_A, Cv_B = self._calculate_cv_split(position) total_Cv = Cv_A + Cv_B if total_Cv > 0: return Cv_A / total_Cv, Cv_B / total_Cv return 0.5, 0.5
[docs] def describe(self) -> dict: """ Introspect metadata for documentation and algorithm querying. Returns: dict: Metadata about the model including algorithms, parameters, equations, and usage information. """ return { 'type': 'ThreeWayValve', 'description': 'Three-way control valve for flow mixing or diverting applications with dead-time and flow coefficient modeling', 'category': 'unit/valve', 'algorithms': { 'flow_splitting': 'Cv_A = Cv_max * (1-position), Cv_B = Cv_max * position', 'mixing_flow': 'Q_out = Q_inlet1 + Q_inlet2 for mixing configuration', 'diverting_flow': 'Q_inlet = Q_outlet1 + Q_outlet2 for diverting configuration', 'flow_calculation': 'Q = Cv * sqrt(ΔP/ρ) for each flow path', 'dead_time_modeling': 'Transport delay with linear interpolation' }, 'parameters': { 'Cv_max': { 'value': self.Cv_max, 'units': 'gpm/psi^0.5', 'description': 'Maximum flow coefficient for single path' }, 'valve_config': { 'value': self.valve_config, 'units': 'dimensionless', 'description': 'Valve configuration (mixing or diverting)' }, 'dead_time': { 'value': self.dead_time, 'units': 's', 'description': 'Actuator dead time delay' }, 'time_constant': { 'value': self.time_constant, 'units': 's', 'description': 'Actuator time constant' } }, 'state_variables': self.state_names, 'inputs': ['position_command'] + (['inlet1_pressure', 'inlet2_pressure', 'outlet_pressure'] if self.valve_config == 'mixing' else ['inlet_pressure', 'outlet1_pressure', 'outlet2_pressure']) + ['fluid_density'], 'outputs': self.state_names, 'valid_ranges': { 'Cv_max': {'min': 1.0, 'max': 1000.0, 'units': 'gpm/psi^0.5'}, 'position_command': {'min': 0.0, 'max': 1.0, 'units': 'fraction'}, 'pressure': {'min': 0.0, 'max': 1.0e7, 'units': 'Pa'}, 'dead_time': {'min': 0.0, 'max': 60.0, 'units': 's'} }, 'applications': ['Stream mixing in chemical reactors', 'Flow diversion for different process units', 'Temperature control via hot/cold stream mixing', 'Bypass control systems', 'Product blending operations'], 'limitations': ['Assumes incompressible flow', 'Linear flow coefficient splitting', 'Single-phase fluid only', 'No interaction between flow paths', 'Constant fluid properties assumed'] }