devjas1 commited on
Commit
8475e7b
·
1 Parent(s): 8d21c18

FEAT(spectroscopy): Develop advanced multi-modal processing engine

Browse files

- Implements a sophisticated framework for processing and fusing multi-modal spectroscopy data, including FTIR, ATR-FTIR, and Raman.
- Introduces the `AdvancedPreprocessor` class, which provides a comprehensive suite of tools for spectral data enhancement:
- **Baseline Correction:** Advanced algorithms including airPLS, ALS, polynomial fitting, and rolling ball methods. - **Normalization:** Multiple strategies such as vector, min-max, standard (Z-score), area, and peak normalization.
- **Denoising:** A range of noise reduction filters including Savitzky-Golay, Gaussian, median, and Wiener filters.
- **Technique-Specific Adjustments:** Includes specialized corrections for ATR, Raman (cosmic ray and fluorescence), and standard FTIR (atmospheric compensation). Features the `MultiModalSpectroscopyEngine` for integrated analysis:
- **Data Fusion:** Implements strategies for combining data from multiple spectral sources, including concatenation, weighted averaging, PCA-based fusion, and an attention mechanism.
- **Quality Assessment:** A spectral quality scoring system to evaluate signal-to-noise ratio, peak prominence, and baseline stability.
- **Automated Recommendations:** Provides intelligent recommendations for the most suitable spectroscopy techniques based on sample type.
- Defines clear data structures for spectroscopy types and their characteristics, ensuring a well-organized and extensible module.

Files changed (1) hide show
  1. modules/advanced_spectroscopy.py +845 -0
modules/advanced_spectroscopy.py ADDED
@@ -0,0 +1,845 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Advanced Spectroscopy Integration Module
2
+ Support dual FTIR + Raman spectroscopy with ATR-FTIR integration"""
3
+
4
+ import numpy as np
5
+ from scipy.integrate import trapz
6
+ from typing import Dict, List, Tuple, Optional, Any
7
+ from dataclasses import dataclass
8
+ from scipy import signal
9
+ import scipy.sparse as sparse
10
+ from scipy.sparse.linalg import spsolve
11
+ from scipy.interpolate import interp1d
12
+ from sklearn.preprocessing import StandardScaler, MinMaxScaler
13
+ from sklearn.decomposition import PCA
14
+ from scipy.signal import find_peaks
15
+ from scipy.ndimage import gaussian_filter1d
16
+
17
+
18
+ @dataclass
19
+ class SpectroscopyType:
20
+ """Define spectroscopy types and their characteristics"""
21
+
22
+ FTIR = "FTIR"
23
+ ATR_FTIR = "ATR-FTIR"
24
+ RAMAN = "Raman"
25
+ TRANSMISSION_FTIR = "Transmission-FTIR"
26
+ REFLECTION_FTIR = "Reflection-FTIR"
27
+
28
+
29
+ @dataclass
30
+ class SpectralCharacteristics:
31
+ """Characteristics of different spectroscopy techniques"""
32
+
33
+ technique: str
34
+ wavenumber_range: Tuple[float, float] # cm-1
35
+ typical_resolution: float # cm-1
36
+ sample_requirements: str
37
+ penetration_depth: Optional[str] = None
38
+ advantages: Optional[List[str]] = None
39
+ limitations: Optional[List[str]] = None
40
+
41
+
42
+ # Define characteristics for each technique
43
+ SPECTRAL_CHARACTERISTICS = {
44
+ SpectroscopyType.FTIR: SpectralCharacteristics(
45
+ technique="FTIR",
46
+ wavenumber_range=(400.0, 4000.0),
47
+ typical_resolution=4.0,
48
+ sample_requirements="Various (solid, liquid, gas)",
49
+ penetration_depth="Variable",
50
+ advantages=["High spectral resolution", "Wide range", "Quantitative"],
51
+ limitations=["Water interference", "Sample preparation"],
52
+ ),
53
+ SpectroscopyType.ATR_FTIR: SpectralCharacteristics(
54
+ technique="ATR-FTIR",
55
+ wavenumber_range=(600.0, 4000.0),
56
+ typical_resolution=4.0,
57
+ sample_requirements="Direct solid contact",
58
+ penetration_depth="0.5-2 μm",
59
+ advantages=["Minimal sample prep", "Solid samples", "Quick analysis"],
60
+ limitations=["Surface analysis only", "Pressure sensitive"],
61
+ ),
62
+ SpectroscopyType.RAMAN: SpectralCharacteristics(
63
+ technique="Raman",
64
+ wavenumber_range=(200, 3500),
65
+ typical_resolution=1.0,
66
+ sample_requirements="Various (solid, liquid)",
67
+ penetration_depth="Variable",
68
+ advantages=["Water compatible", "Non-destructive", "Molecular vibrations"],
69
+ limitations=["Fluorescence interference", "Weak signals"],
70
+ ),
71
+ }
72
+
73
+
74
+ class AdvancedPreprocessor:
75
+ """Advanced preprocessing pipeline for multi-modal spectroscopy data"""
76
+
77
+ def __init__(self):
78
+ self.techniques_applied = []
79
+ self.preprocessing_log = []
80
+
81
+ def baseline_correction(
82
+ self,
83
+ wavenumber: np.ndarray,
84
+ intensities: np.ndarray,
85
+ method: str = "airpls",
86
+ **kwargs,
87
+ ) -> Tuple[np.ndarray, Dict]:
88
+ """
89
+ Advanced baseline correction methods
90
+
91
+ Args:
92
+ wavenumber: Wavenumber array
93
+ intensities: Intensity array
94
+ method: Baseline correction method ('airpls', 'als', 'polynomial', 'rolling_ball')
95
+ **kwargs: Method-specific parameters
96
+
97
+ Returns:
98
+ Corrected intensities and processing metadata
99
+ """
100
+ metadata = {
101
+ "method": method,
102
+ "original_range": (intensities.min(), intensities.max()),
103
+ }
104
+ corrected_intensities = intensities.copy()
105
+
106
+ if method == "airpls":
107
+ corrected_intensities = self._airpls_baseline(intensities, **kwargs)
108
+ elif method == "als":
109
+ corrected_intensities = self._als_baseline(intensities, **kwargs)
110
+ elif method == "polynomial":
111
+ degree = kwargs.get("degree", 3)
112
+ coeffs = np.polyfit(wavenumber, intensities, degree)
113
+ baseline = np.polyval(coeffs, wavenumber)
114
+ corrected_intensities = intensities - baseline
115
+ metadata["polynomial_degree"] = degree
116
+ elif method == "rolling_ball":
117
+ ball_radius = kwargs.get("radius", 50)
118
+ corrected_intensities = self._rolling_ball_baseline(
119
+ intensities, ball_radius
120
+ )
121
+ metadata["ball_radius"] = ball_radius
122
+
123
+ self.preprocessing_log.append(f"Baseline correction: {method}")
124
+ metadata["corrected_range"] = (
125
+ corrected_intensities.min(),
126
+ corrected_intensities.max(),
127
+ )
128
+
129
+ return corrected_intensities, metadata
130
+
131
+ def _airpls_baseline(
132
+ self, y: np.ndarray, lambda_: float = 1e4, itermax: int = 15
133
+ ) -> np.ndarray:
134
+ """
135
+ Adaptive Iteratively Reweighted Penalized Least Squares baseline correction
136
+ """
137
+ m = len(y)
138
+ D = sparse.diags([1, -2, 1], offsets=[0, -1, -2], shape=(m, m - 2))
139
+ D = lambda_ * D.dot(D.transpose())
140
+ w = np.ones(m)
141
+
142
+ for i in range(itermax):
143
+ W = sparse.spdiags(w, 0, m, m)
144
+ Z = W + D
145
+ z = spsolve(Z, w * y)
146
+ d = y - z
147
+ dn = d[d < 0]
148
+
149
+ m_dn = np.mean(dn) if len(dn) > 0 else 0
150
+ s_dn = np.std(dn) if len(dn) > 1 else 1
151
+
152
+ wt = 1.0 / (1 + np.exp(2 * (d - (2 * s_dn - m_dn)) / s_dn))
153
+
154
+ if np.linalg.norm(w - wt) / np.linalg.norm(w) < 1e-9:
155
+ break
156
+ w = wt
157
+
158
+ z = spsolve(sparse.spdiags(w, 0, m, m) + D, w * y)
159
+ return y - z
160
+
161
+ def _als_baseline(
162
+ self, y: np.ndarray, lambda_: float = 1e4, p: float = 0.001
163
+ ) -> np.ndarray:
164
+ """
165
+ Asymmetric Least Squares baseline correction
166
+ """
167
+ m = len(y)
168
+ D = sparse.diags([1, -2, 1], [0, -1, -2], shape=(m, m - 2))
169
+ D_t_D = D.dot(D.transpose())
170
+ w = np.ones(m)
171
+
172
+ for _ in range(10):
173
+ W = sparse.spdiags(w, 0, m, m)
174
+ Z = W + lambda_ * D_t_D
175
+ z = spsolve(Z, w * y)
176
+ w = p * (y > z) + (1 - p) * (y < z)
177
+
178
+ return y - z
179
+
180
+ def _rolling_ball_baseline(self, y: np.ndarray, radius: int) -> np.ndarray:
181
+ """
182
+ Rolling ball baseline correction
183
+ """
184
+ n = len(y)
185
+ baseline = np.zeros_like(y)
186
+
187
+ for i in range(n):
188
+ start = max(0, i - radius)
189
+ end = min(n, i + radius + 1)
190
+ baseline[i] = np.min(y[start:end])
191
+
192
+ return y - baseline
193
+
194
+ def normalization(
195
+ self,
196
+ wavenumbers: np.ndarray,
197
+ intensities: np.ndarray,
198
+ method: str = "vector",
199
+ **kwargs,
200
+ ) -> Tuple[np.ndarray, Dict]:
201
+ """
202
+ Advanced normalization methods for spectroscopy data
203
+
204
+ Args:
205
+ wavenumbers: Wavenumber array
206
+ intensities: Intensity array
207
+ method: Normalization method ('vector', 'min_max', 'standard', 'area', 'peak')
208
+ **kwargs: Method-specific parameters
209
+
210
+ Returns:
211
+ Normalized intensities and processing metadata
212
+ """
213
+ normalized_intensities = intensities.copy()
214
+ metadata = {"method": method, "original_std": np.std(intensities)}
215
+
216
+ if method == "vector":
217
+ norm = np.linalg.norm(intensities)
218
+ normalized_intensities = intensities / norm if norm > 0 else intensities
219
+ metadata["norm_value"] = norm
220
+ elif method == "min_max":
221
+ scaler = MinMaxScaler()
222
+ normalized_intensities = scaler.fit_transform(
223
+ intensities.reshape(-1, 1)
224
+ ).flatten()
225
+ metadata["min_value"] = scaler.data_min_[0]
226
+ metadata["max_value"] = scaler.data_max_[0]
227
+ elif method == "standard":
228
+ scaler = StandardScaler()
229
+ normalized_intensities = scaler.fit_transform(
230
+ intensities.reshape(-1, 1)
231
+ ).flatten()
232
+ metadata["mean"] = scaler.mean_[0] if scaler.mean_ is not None else None
233
+ metadata["std"] = scaler.scale_[0] if scaler.scale_ is not None else None
234
+ elif method == "area":
235
+ area = trapz(np.abs(intensities), wavenumbers)
236
+ normalized_intensities = intensities / area if area > 0 else intensities
237
+ metadata["area"] = area
238
+ elif method == "peak":
239
+ peak_idx = kwargs.get("peak_idx", np.argmax(np.abs(intensities)))
240
+ peak_value = intensities[peak_idx]
241
+ normalized_intensities = (
242
+ intensities / peak_value if peak_value != 0 else intensities
243
+ )
244
+ metadata["peak_wavenumber"] = wavenumbers[peak_idx]
245
+ metadata["peak_value"] = peak_value
246
+
247
+ self.preprocessing_log.append(f"Normalization: {method}")
248
+ metadata["normalized_std"] = np.std(normalized_intensities)
249
+
250
+ return normalized_intensities, metadata
251
+
252
+ def noise_reduction(
253
+ self,
254
+ wavenumbers: np.ndarray,
255
+ intensities: np.ndarray,
256
+ method: str = "savgol",
257
+ **kwargs,
258
+ ) -> Tuple[np.ndarray, Dict]:
259
+ """
260
+ Advanced noise reduction techniques
261
+
262
+ Args:
263
+ wavenumbers: Wavenumber array
264
+ intensities: Intensity array
265
+ method: Denoising method ('savgol', 'wiener', 'median', 'gaussian')
266
+ **kwargs: Method-specific parameters
267
+
268
+ Returns:
269
+ Reduced intensities and processing metadata
270
+ """
271
+ denoised_intensities = intensities.copy()
272
+ metadata = {
273
+ "method": method,
274
+ "original_noise_level": np.std(np.diff(intensities)),
275
+ }
276
+
277
+ if method == "savgol":
278
+ window_length = kwargs.get("window_length", 11)
279
+ polyorder = kwargs.get("polyorder", 3)
280
+
281
+ if window_length % 2 == 0:
282
+ window_length += 1
283
+ window_length = max(window_length, polyorder + 1)
284
+ window_length = min(window_length, len(intensities) - 1)
285
+
286
+ if window_length >= 3:
287
+ denoised_intensities = signal.savgol_filter(
288
+ intensities, window_length, polyorder
289
+ )
290
+ metadata["window_length"] = window_length
291
+ metadata["polyorder"] = polyorder
292
+ elif method == "gaussian":
293
+ sigma = kwargs.get("sigma", 1.0) # Default value for sigma
294
+ denoised_intensities = gaussian_filter1d(intensities, sigma)
295
+ metadata["sigma"] = sigma
296
+ elif method == "median":
297
+ kernel_size = kwargs.get("kernel_size", 5)
298
+ denoised_intensities = signal.medfilt(intensities, kernel_size)
299
+ metadata["kernel_size"] = kernel_size
300
+ elif method == "wiener":
301
+ noise_power = kwargs.get("noise_power", None)
302
+ denoised_intensities = signal.wiener(intensities, noise=noise_power)
303
+ metadata["noise_power"] = noise_power
304
+
305
+ self.preprocessing_log.append(f"Noise reduction: {method}")
306
+ metadata["final_noise_level"] = np.std(np.diff(denoised_intensities))
307
+
308
+ return denoised_intensities, metadata
309
+
310
+ def technique_specific_preprocessing(
311
+ self, wavenumbers: np.ndarray, intensities: np.ndarray, technique: str
312
+ ) -> tuple[np.ndarray, Dict]:
313
+ """
314
+ Apply technique-specific preprocessing optimizations
315
+
316
+ Args:
317
+ wavenumbers: Wavenumber array
318
+ intensities: Intensity array
319
+ technique: Spectroscopy technique
320
+
321
+ Returns:
322
+ Processed intensities and metadata
323
+ """
324
+ processed_intensities = intensities.copy()
325
+ metadata = {"technique": technique, "optimizations_applied": []}
326
+
327
+ if technique == SpectroscopyType.ATR_FTIR:
328
+ processed_intensities = self._atr_correction(wavenumbers, intensities)
329
+ metadata["optimizations_applied"].append("ATR_penetration_correction")
330
+ elif technique == SpectroscopyType.RAMAN:
331
+ processed_intensities = self._cosmic_ray_removal(intensities)
332
+ metadata["optimizations_applied"].append("cosmic_ray_removal")
333
+ processed_intensities = self._fluorescence_correction(
334
+ wavenumbers, processed_intensities
335
+ )
336
+ metadata["optimizations_applied"].append("fluorescence_correction")
337
+ elif technique == SpectroscopyType.FTIR:
338
+ processed_intensities = self._atmospheric_correction(
339
+ wavenumbers, intensities
340
+ )
341
+ metadata["optimizations_applied"].append("atmospheric_correction")
342
+
343
+ self.preprocessing_log.append(f"Technique-specific preprocessing: {technique}")
344
+ return processed_intensities, metadata
345
+
346
+ def _atr_correction(
347
+ self, wavenumbers: np.ndarray, intensities: np.ndarray
348
+ ) -> np.ndarray:
349
+ """
350
+ Apply ATR correction for wavelength-dependant penetration depth
351
+ """
352
+ correction_factor = np.sqrt(wavenumbers / np.max(wavenumbers))
353
+ return intensities * correction_factor
354
+
355
+ def _cosmic_ray_removal(
356
+ self, intensities: np.ndarray, threshold: float = 3.0
357
+ ) -> np.ndarray:
358
+ """
359
+ Remove cosmic ray spikes from Raman spectra
360
+ """
361
+ diff = np.abs(np.diff(intensities, prepend=intensities[0]))
362
+ mean_diff = np.mean(diff)
363
+ std_diff = np.std(diff)
364
+
365
+ spikes = diff > (mean_diff + threshold * std_diff)
366
+ corrected = intensities.copy()
367
+
368
+ for i in np.where(spikes)[0]:
369
+ if i > 0 and i < len(corrected) - 1:
370
+ corrected[i] = (corrected[i - 1] + corrected[i + 1]) / 2
371
+
372
+ return corrected
373
+
374
+ def _fluorescence_correction(
375
+ self, wavenumbers: np.ndarray, intensities: np.ndarray
376
+ ) -> np.ndarray:
377
+ """
378
+ Remove fluorescence from Raman spectra
379
+ """
380
+ try:
381
+ coeffs = np.polyfit(wavenumbers, intensities, deg=3)
382
+ background = np.polyval(coeffs, wavenumbers)
383
+ return intensities - background
384
+ except np.linalg.LinAlgError:
385
+ return intensities
386
+
387
+ def _atmospheric_correction(
388
+ self, wavenumbers: np.ndarray, intensities: np.ndarray
389
+ ) -> np.ndarray:
390
+ """
391
+ Correct for atmospheric CO2 and water vapor absorption
392
+ """
393
+ corrected = intensities.copy()
394
+ co2_mask = (wavenumbers >= 2350) & (wavenumbers <= 2380)
395
+ if np.any(co2_mask):
396
+ non_co2_idx = ~co2_mask
397
+ if np.any(non_co2_idx):
398
+ interp_func = interp1d(
399
+ wavenumbers[non_co2_idx],
400
+ corrected[non_co2_idx],
401
+ kind="linear",
402
+ bounds_error=False,
403
+ fill_value="extrapolate",
404
+ )
405
+ corrected[co2_mask] = interp_func(wavenumbers[co2_mask])
406
+
407
+ return corrected
408
+
409
+
410
+ class MultiModalSpectroscopyEngine:
411
+ """Engine for handling multi-modal spectrscopy data fusion."""
412
+
413
+ def __init__(self):
414
+ self.preprocessor = AdvancedPreprocessor()
415
+ self.registered_techniques = {}
416
+ self.fusion_strategies = [
417
+ "concatenation",
418
+ "weighted_average",
419
+ "pca_fusion",
420
+ "attention_fusion",
421
+ ]
422
+
423
+ def register_spectrum(
424
+ self,
425
+ wavenumbers: np.ndarray,
426
+ intensities: np.ndarray,
427
+ technique: str,
428
+ metadata: Optional[Dict] = None,
429
+ ) -> str:
430
+ """
431
+ Register a spectrum for multi-modal analysis
432
+
433
+ Args:
434
+ wavenumbers: Wavenumber array
435
+ intensities: Intensity array
436
+ technique: Spectroscopy technique type
437
+ metadata: Additional metadata for the spectrum
438
+
439
+ Returns:
440
+ Spectrum ID for tracking
441
+ """
442
+ spectrum_id = f"{technique}_{len(self.registered_techniques)}"
443
+
444
+ self.registered_techniques[spectrum_id] = {
445
+ "wavenumbers": wavenumbers,
446
+ "intensities": intensities,
447
+ "technique": technique,
448
+ "metadata": metadata or {},
449
+ "characteristics": SPECTRAL_CHARACTERISTICS.get(technique),
450
+ }
451
+
452
+ return spectrum_id
453
+
454
+ def preprocess_spectrum(
455
+ self, spectrum_id: str, preprocessing_config: Optional[Dict] = None
456
+ ) -> Dict:
457
+ """
458
+ Apply comprehensive preprocessing to a registered spectrum
459
+
460
+ Args:
461
+ spectrum_id: ID of registered spectrum
462
+ preprocessing_config: Configuration for preprocessing steps
463
+
464
+ Returns:
465
+ Processing results and metadata
466
+ """
467
+ if spectrum_id not in self.registered_techniques:
468
+ raise ValueError(f"Spectrum with ID {spectrum_id} not found.")
469
+
470
+ spectrum_data = self.registered_techniques[spectrum_id]
471
+ wavenumbers = spectrum_data["wavenumbers"]
472
+ intensities = spectrum_data["intensities"]
473
+ technique = spectrum_data["technique"]
474
+
475
+ config = preprocessing_config or {}
476
+
477
+ processed_intensities = intensities.copy()
478
+ processing_metadata = {"steps_applied": [], "step_metadata": {}}
479
+
480
+ if config.get("baseline_correction", True):
481
+ method = config.get("baseline_method", "airpls")
482
+ processed_intensities, baseline_metadata = (
483
+ self.preprocessor.baseline_correction(
484
+ wavenumbers, processed_intensities, method=method
485
+ )
486
+ )
487
+ processing_metadata["steps_applied"].append("baseline_correction")
488
+ processing_metadata["step_metadata"][
489
+ "baseline_correction"
490
+ ] = baseline_metadata
491
+
492
+ processed_intensities, technique_meta = (
493
+ self.preprocessor.technique_specific_preprocessing(
494
+ wavenumbers, processed_intensities, technique
495
+ )
496
+ )
497
+ processing_metadata["steps_applied"].append("technique_specific")
498
+ processing_metadata["step_metadata"]["technique_specific"] = technique_meta
499
+
500
+ if config.get("noise_reduction", True):
501
+ method = config.get("noise_method", "savgol")
502
+ processed_intensities, noise_meta = self.preprocessor.noise_reduction(
503
+ wavenumbers, processed_intensities, method=method
504
+ )
505
+ processing_metadata["steps_applied"].append("noise_reduction")
506
+ processing_metadata["step_metadata"]["noise_reduction"] = noise_meta
507
+
508
+ if config.get("normalization", True):
509
+ method = config.get("norm_method", "vector")
510
+ processed_intensities, norm_meta = self.preprocessor.normalization(
511
+ wavenumbers, processed_intensities, method=method
512
+ )
513
+ processing_metadata["steps_applied"].append("normalization")
514
+ processing_metadata["step_metadata"]["normalization"] = norm_meta
515
+
516
+ self.registered_techniques[spectrum_id][
517
+ "processed_intensities"
518
+ ] = processed_intensities
519
+ self.registered_techniques[spectrum_id][
520
+ "processing_metadata"
521
+ ] = processing_metadata
522
+
523
+ return {
524
+ "spectrum_id": spectrum_id,
525
+ "processed_intensities": processed_intensities,
526
+ "processing_metadata": processing_metadata,
527
+ "quality_score": self._calculate_quality_score(
528
+ wavenumbers, processed_intensities
529
+ ),
530
+ }
531
+
532
+ def fuse_spectra(
533
+ self,
534
+ spectrum_ids: List[str],
535
+ fusion_strategy: str = "concatenation",
536
+ target_wavenumber_range: Optional[Tuple[float, float]] = None,
537
+ ) -> Dict:
538
+ """Fuse multiple spectra using specified strategy
539
+
540
+ Args:
541
+ spectrum_ids: List of spectrum IDs to fuse
542
+ fusion_strategy: Fusion strategy ('concatenation', 'weighted_average', etc.)
543
+ target_wavenumber_range: Common wavenumber for fusion
544
+
545
+ Returns:
546
+ Fused spectrum data and processing metadata
547
+ """
548
+ if not all(sid in self.registered_techniques for sid in spectrum_ids):
549
+ raise ValueError("Some spectrum IDs not found")
550
+
551
+ spectra_data = [self.registered_techniques[sid] for sid in spectrum_ids]
552
+
553
+ if fusion_strategy == "concatenation":
554
+ return self._concatenation_fusion(spectra_data, target_wavenumber_range)
555
+ elif fusion_strategy == "weighted_average":
556
+ return self._weighted_average_fusion(spectra_data, target_wavenumber_range)
557
+ elif fusion_strategy == "pca_fusion":
558
+ return self._pca_fusion(spectra_data, target_wavenumber_range)
559
+ elif fusion_strategy == "attention_fusion":
560
+ return self._attention_fusion(spectra_data, target_wavenumber_range)
561
+ else:
562
+ raise ValueError(
563
+ f"Unknown or unsupported fusion strategy: {fusion_strategy}"
564
+ )
565
+
566
+ def _interpolate_to_common_grid(
567
+ self,
568
+ spectra_data: List[Dict],
569
+ target_range: Tuple[float, float],
570
+ num_points: int = 1000,
571
+ ) -> Tuple[np.ndarray, List[np.ndarray]]:
572
+ """Interpolate all spectra to a common wavenumber grid"""
573
+ common_wavenumbers = np.linspace(target_range[0], target_range[1], num_points)
574
+ interpolated_intensities_list = []
575
+
576
+ for spectrum in spectra_data:
577
+ wavenumbers = spectrum["wavenumbers"]
578
+ intensities = spectrum.get("processed_intensities", spectrum["intensities"])
579
+
580
+ valid_range = (wavenumbers.min(), wavenumbers.max())
581
+ mask = (common_wavenumbers >= valid_range[0]) & (
582
+ common_wavenumbers <= valid_range[1]
583
+ )
584
+
585
+ interp_intensities = np.zeros_like(common_wavenumbers)
586
+ if np.any(mask):
587
+ interp_func = interp1d(
588
+ wavenumbers,
589
+ intensities,
590
+ kind="linear",
591
+ bounds_error=False,
592
+ fill_value=0,
593
+ )
594
+ interp_intensities[mask] = interp_func(common_wavenumbers[mask])
595
+
596
+ interpolated_intensities_list.append(interp_intensities)
597
+
598
+ return common_wavenumbers, interpolated_intensities_list
599
+
600
+ def _concatenation_fusion(
601
+ self, spectra_data: List[Dict], target_range: Optional[Tuple[float, float]]
602
+ ) -> Dict:
603
+ """Simple concatenation of spectra"""
604
+ if target_range is None:
605
+ min_wn = max(s["wavenumbers"].min() for s in spectra_data)
606
+ max_wn = min(s["wavenumbers"].max() for s in spectra_data)
607
+ target_range = (min_wn, max_wn)
608
+
609
+ common_wn, interpolated_intensities = self._interpolate_to_common_grid(
610
+ spectra_data, target_range
611
+ )
612
+
613
+ fused_intensities = np.concatenate(interpolated_intensities)
614
+ fused_wavenumbers = np.tile(common_wn, len(spectra_data))
615
+
616
+ return {
617
+ "wavenumbers": fused_wavenumbers,
618
+ "intensities": fused_intensities,
619
+ "fusion_strategy": "concatenation",
620
+ "source_techniques": [s["technique"] for s in spectra_data],
621
+ "common_range": target_range,
622
+ }
623
+
624
+ def _weighted_average_fusion(
625
+ self, spectra_data: List[Dict], target_range: Optional[Tuple[float, float]]
626
+ ) -> Dict:
627
+ """Weighted average fusion based on data quality"""
628
+ if target_range is None:
629
+ min_wn = max(s["wavenumbers"].min() for s in spectra_data)
630
+ max_wn = min(s["wavenumbers"].max() for s in spectra_data)
631
+ target_range = (min_wn, max_wn)
632
+
633
+ common_wn, interpolated_intensities = self._interpolate_to_common_grid(
634
+ spectra_data, target_range
635
+ )
636
+
637
+ weights = []
638
+ for i, spectrum in enumerate(spectra_data):
639
+ quality_score = self._calculate_quality_score(
640
+ common_wn, interpolated_intensities[i]
641
+ )
642
+ weights.append(quality_score)
643
+
644
+ weights = np.array(weights)
645
+ weights_sum = np.sum(weights)
646
+ weights = (
647
+ weights / weights_sum
648
+ if weights_sum > 0
649
+ else np.full_like(weights, 1.0 / len(weights))
650
+ )
651
+
652
+ fused_intensities = np.zeros_like(common_wn)
653
+ for i, intensities in enumerate(interpolated_intensities):
654
+ fused_intensities += weights[i] * intensities
655
+
656
+ return {
657
+ "wavenumbers": common_wn,
658
+ "intensities": fused_intensities,
659
+ "fusion_strategy": "weighted_average",
660
+ "weights": weights.tolist(),
661
+ "source_techniques": [s["technique"] for s in spectra_data],
662
+ "common_range": target_range,
663
+ }
664
+
665
+ def _pca_fusion(
666
+ self, spectra_data: List[Dict], target_range: Optional[Tuple[float, float]]
667
+ ) -> Dict:
668
+ """PCA-based fusion to extract common features"""
669
+ if target_range is None:
670
+ min_wn = max(s["wavenumbers"].min() for s in spectra_data)
671
+ max_wn = min(s["wavenumbers"].max() for s in spectra_data)
672
+ target_range = (min_wn, max_wn)
673
+
674
+ common_wn, interpolated_intensities = self._interpolate_to_common_grid(
675
+ spectra_data, target_range
676
+ )
677
+
678
+ spectra_matrix = np.vstack(interpolated_intensities)
679
+
680
+ n_components = min(len(spectra_data), 3)
681
+ pca = PCA(n_components=n_components)
682
+ pca.fit(spectra_matrix.T) # Fit on features (wavenumbers)
683
+
684
+ fused_intensities = np.dot(pca.explained_variance_ratio_, pca.components_)
685
+
686
+ return {
687
+ "wavenumbers": common_wn,
688
+ "intensities": fused_intensities,
689
+ "fusion_strategy": "pca_fusion",
690
+ "explained_variance_ratio": pca.explained_variance_ratio_.tolist(),
691
+ "n_components": n_components,
692
+ "source_techniques": [s["technique"] for s in spectra_data],
693
+ "common_range": target_range,
694
+ }
695
+
696
+ def _attention_fusion(
697
+ self, spectra_data: List[Dict], target_range: Optional[Tuple[float, float]]
698
+ ) -> Dict:
699
+ """Attention-based fusion using a simple neural attention-like mechanism"""
700
+ if target_range is None:
701
+ min_wn = max(s["wavenumbers"].min() for s in spectra_data)
702
+ max_wn = min(s["wavenumbers"].max() for s in spectra_data)
703
+ target_range = (min_wn, max_wn)
704
+
705
+ common_wn, interpolated_intensities = self._interpolate_to_common_grid(
706
+ spectra_data, target_range
707
+ )
708
+
709
+ attention_scores = []
710
+ for intensities in interpolated_intensities:
711
+ variance = np.var(intensities)
712
+ quality = self._calculate_quality_score(common_wn, intensities)
713
+ attention_scores.append(variance * quality)
714
+
715
+ attention_scores = np.array(attention_scores)
716
+ exp_scores = np.exp(
717
+ attention_scores - np.max(attention_scores)
718
+ ) # Softmax for stability
719
+ attention_weights = exp_scores / np.sum(exp_scores)
720
+
721
+ fused_intensities = np.zeros_like(common_wn)
722
+ for i, intensities in enumerate(interpolated_intensities):
723
+ fused_intensities += attention_weights[i] * intensities
724
+
725
+ return {
726
+ "wavenumbers": common_wn,
727
+ "intensities": fused_intensities,
728
+ "fusion_strategy": "attention_fusion",
729
+ "attention_weights": attention_weights.tolist(),
730
+ "source_techniques": [s["technique"] for s in spectra_data],
731
+ "common_range": target_range,
732
+ }
733
+
734
+ def _calculate_quality_score(
735
+ self, wavenumbers: np.ndarray, intensities: np.ndarray
736
+ ) -> float:
737
+ """Calculate spectral quality score based on signal-to-noise ratio and other metrics"""
738
+ try:
739
+ signal_power = np.var(intensities)
740
+ if len(intensities) < 2:
741
+ return 0.0
742
+ noise_power = np.var(np.diff(intensities))
743
+ snr = signal_power / noise_power if noise_power > 0 else 1e6
744
+
745
+ peaks, properties = find_peaks(
746
+ intensities, prominence=0.1 * np.std(intensities)
747
+ )
748
+ peak_prominence = (
749
+ np.mean(properties["prominences"]) if len(peaks) > 0 else 0
750
+ )
751
+
752
+ baseline_stability = 1.0 / (
753
+ 1.0 + np.std(intensities[:10]) + np.std(intensities[-10:])
754
+ )
755
+
756
+ quality_score = (
757
+ np.log10(max(snr, 1)) * 0.5
758
+ + peak_prominence * 0.3
759
+ + baseline_stability * 0.2
760
+ )
761
+
762
+ return max(0, min(1, quality_score))
763
+ except Exception:
764
+ return 0.5
765
+
766
+ def get_technique_recommendations(self, sample_type: str) -> List[Dict]:
767
+ """
768
+ Recommend optimal spectroscopy techniques for a given sample type
769
+
770
+ Args:
771
+ sample_type: Type of sample (e.g., 'solid_polymer', 'liquid_polymer', 'thin_film')
772
+
773
+ Returns:
774
+ List of recommended techniques with rationale
775
+ """
776
+ recommendations = []
777
+
778
+ if sample_type in ["solid_polymer", "polymer_pellets", "polymer_film"]:
779
+ recommendations.extend(
780
+ [
781
+ {
782
+ "technique": SpectroscopyType.ATR_FTIR,
783
+ "priority": "high",
784
+ "rationale": "Minimal sample preparation, direct solid contact analysis",
785
+ "characteristics": SPECTRAL_CHARACTERISTICS[
786
+ SpectroscopyType.ATR_FTIR
787
+ ],
788
+ },
789
+ {
790
+ "technique": SpectroscopyType.RAMAN,
791
+ "priority": "medium",
792
+ "rationale": "Complementary vibrational information, non-destructive",
793
+ "characteristics": SPECTRAL_CHARACTERISTICS[
794
+ SpectroscopyType.RAMAN
795
+ ],
796
+ },
797
+ ]
798
+ )
799
+ elif sample_type in ["liquid_polymer", "polymer_solution"]:
800
+ recommendations.extend(
801
+ [
802
+ {
803
+ "technique": SpectroscopyType.FTIR,
804
+ "priority": "high",
805
+ "rationale": "Versatile for liquid samples, wide spectral range",
806
+ "characteristics": SPECTRAL_CHARACTERISTICS[
807
+ SpectroscopyType.FTIR
808
+ ],
809
+ },
810
+ {
811
+ "technique": SpectroscopyType.RAMAN,
812
+ "priority": "high",
813
+ "rationale": "Water compatible, molecular vibrations",
814
+ "characteristics": SPECTRAL_CHARACTERISTICS[
815
+ SpectroscopyType.RAMAN
816
+ ],
817
+ },
818
+ ]
819
+ )
820
+ elif sample_type in ["weathered_polymer", "aged_polymer"]:
821
+ recommendations.extend(
822
+ [
823
+ {
824
+ "technique": SpectroscopyType.ATR_FTIR,
825
+ "priority": "high",
826
+ "rationale": "Surface analysis for weathering products",
827
+ "characteristics": SPECTRAL_CHARACTERISTICS[
828
+ SpectroscopyType.ATR_FTIR
829
+ ],
830
+ },
831
+ {
832
+ "technique": SpectroscopyType.FTIR,
833
+ "priority": "medium",
834
+ "rationale": "Bulk analysis for degradation assessment",
835
+ "characteristics": SPECTRAL_CHARACTERISTICS[
836
+ SpectroscopyType.FTIR
837
+ ],
838
+ },
839
+ ]
840
+ )
841
+
842
+ return recommendations
843
+
844
+
845
+ ""