Source code for sproclib.utilities.data_utils

"""
Data Utilities for SPROCLIB

This module provides data processing utilities for process control
including filtering, resampling, and data analysis functions.

Author: Thorsten Gressling <gressling@paramus.ai>
License: MIT License
"""

import numpy as np
from typing import Optional, Tuple, Dict, Any, List
from scipy import signal
import logging

logger = logging.getLogger(__name__)


[docs] def filter_data( data: np.ndarray, filter_type: str = 'lowpass', cutoff: float = 0.1, order: int = 2, method: str = 'butterworth' ) -> np.ndarray: """ Filter time series data. Args: data: Input data array filter_type: Type of filter ('lowpass', 'highpass', 'bandpass') cutoff: Cutoff frequency (normalized) order: Filter order method: Filter method ('butterworth', 'chebyshev', 'bessel') Returns: Filtered data array """ try: if method.lower() == 'butterworth': if filter_type.lower() == 'lowpass': b, a = signal.butter(order, cutoff, btype='low') elif filter_type.lower() == 'highpass': b, a = signal.butter(order, cutoff, btype='high') elif filter_type.lower() == 'bandpass': if isinstance(cutoff, (list, tuple)) and len(cutoff) == 2: b, a = signal.butter(order, cutoff, btype='band') else: raise ValueError("Bandpass filter requires two cutoff frequencies") else: raise ValueError(f"Unknown filter type: {filter_type}") else: raise ValueError(f"Unknown filter method: {method}") # Apply filter filtered_data = signal.filtfilt(b, a, data) return filtered_data except Exception as e: logger.error(f"Data filtering error: {e}") return data # Return original data if filtering fails
[docs] def resample_data( t: np.ndarray, data: np.ndarray, new_dt: float, method: str = 'linear' ) -> Tuple[np.ndarray, np.ndarray]: """ Resample time series data. Args: t: Original time array data: Original data array new_dt: New sampling interval method: Interpolation method Returns: New time array and resampled data """ from scipy.interpolate import interp1d try: # Create new time array t_new = np.arange(t[0], t[-1], new_dt) # Interpolate data interpolator = interp1d(t, data, kind=method, bounds_error=False, fill_value='extrapolate') data_new = interpolator(t_new) return t_new, data_new except Exception as e: logger.error(f"Data resampling error: {e}") return t, data
[docs] def detect_outliers( data: np.ndarray, method: str = 'iqr', threshold: float = 1.5 ) -> np.ndarray: """ Detect outliers in data. Args: data: Input data array method: Detection method ('iqr', 'zscore', 'modified_zscore') threshold: Threshold for outlier detection Returns: Boolean array indicating outliers """ try: if method.lower() == 'iqr': Q1 = np.percentile(data, 25) Q3 = np.percentile(data, 75) IQR = Q3 - Q1 lower = Q1 - threshold * IQR upper = Q3 + threshold * IQR outliers = (data < lower) | (data > upper) elif method.lower() == 'zscore': z_scores = np.abs((data - np.mean(data)) / np.std(data)) outliers = z_scores > threshold elif method.lower() == 'modified_zscore': median = np.median(data) mad = np.median(np.abs(data - median)) modified_z_scores = 0.6745 * (data - median) / mad outliers = np.abs(modified_z_scores) > threshold else: raise ValueError(f"Unknown outlier detection method: {method}") return outliers except Exception as e: logger.error(f"Outlier detection error: {e}") return np.zeros(len(data), dtype=bool)
[docs] def smooth_data( data: np.ndarray, window_size: int = 5, method: str = 'moving_average' ) -> np.ndarray: """ Smooth data using various methods. Args: data: Input data array window_size: Size of smoothing window method: Smoothing method ('moving_average', 'savgol', 'exponential') Returns: Smoothed data array """ try: if method.lower() == 'moving_average': # Simple moving average kernel = np.ones(window_size) / window_size smoothed = np.convolve(data, kernel, mode='same') elif method.lower() == 'savgol': # Savitzky-Golay filter from scipy.signal import savgol_filter polyorder = min(3, window_size - 1) smoothed = savgol_filter(data, window_size, polyorder) elif method.lower() == 'exponential': # Exponential smoothing alpha = 2.0 / (window_size + 1) smoothed = np.zeros_like(data) smoothed[0] = data[0] for i in range(1, len(data)): smoothed[i] = alpha * data[i] + (1 - alpha) * smoothed[i-1] else: raise ValueError(f"Unknown smoothing method: {method}") return smoothed except Exception as e: logger.error(f"Data smoothing error: {e}") return data
[docs] def calculate_statistics(data: np.ndarray) -> Dict[str, float]: """ Calculate basic statistics for data. Args: data: Input data array Returns: Dictionary with statistical measures """ try: stats = { 'mean': np.mean(data), 'median': np.median(data), 'std': np.std(data), 'var': np.var(data), 'min': np.min(data), 'max': np.max(data), 'range': np.max(data) - np.min(data), 'skewness': calculate_skewness(data), 'kurtosis': calculate_kurtosis(data) } return stats except Exception as e: logger.error(f"Statistics calculation error: {e}") return {}
[docs] def calculate_skewness(data: np.ndarray) -> float: """Calculate skewness of data.""" try: n = len(data) mean = np.mean(data) std = np.std(data) if std == 0: return 0.0 skewness = (n / ((n-1) * (n-2))) * np.sum(((data - mean) / std) ** 3) return skewness except Exception: return 0.0
[docs] def calculate_kurtosis(data: np.ndarray) -> float: """Calculate kurtosis of data.""" try: n = len(data) mean = np.mean(data) std = np.std(data) if std == 0: return 0.0 kurtosis = (n * (n+1) / ((n-1) * (n-2) * (n-3))) * np.sum(((data - mean) / std) ** 4) - 3 * (n-1)**2 / ((n-2) * (n-3)) return kurtosis except Exception: return 0.0
__all__ = [ 'filter_data', 'resample_data', 'detect_outliers', 'smooth_data', 'calculate_statistics', 'calculate_skewness', 'calculate_kurtosis' ]