"""
Processor Base Module
This module defines the base Processor interface and related abstractions
for building processing pipelines.
Author: FACETpy Team
Date: 2025-01-12
"""
from abc import ABC, abstractmethod
from typing import Any
from loguru import logger
from .context import ProcessingContext
[docs]
class ProcessorError(Exception):
"""Base exception for all processor-related errors."""
pass
[docs]
class ProcessorValidationError(ProcessorError):
"""Raised when processor validation fails."""
pass
[docs]
class Processor(ABC):
"""
Base class for all processors in the pipeline.
Processors are the fundamental building blocks of FACETpy pipelines.
Each processor:
- Takes a ProcessingContext as input
- Performs a specific operation
- Returns a new ProcessingContext (immutable by default)
- Can validate prerequisites before processing
- Tracks its execution in the context history
Subclasses must implement:
- process(): Main processing logic
- validate(): Check prerequisites (optional)
Example::
class HighPassFilter(Processor):
name = "highpass_filter"
def __init__(self, freq: float):
self.freq = freq
def validate(self, context: ProcessingContext) -> None:
if not context.has_raw():
raise ProcessorValidationError("No raw data available")
def process(self, context: ProcessingContext) -> ProcessingContext:
raw = context.get_raw().copy()
raw.filter(l_freq=self.freq, h_freq=None)
return context.with_raw(raw)
"""
# Class attributes (can be overridden in subclasses)
name: str = "base_processor"
description: str = "Base processor"
version: str = "1.0.0"
# Processing flags
requires_triggers: bool = False
requires_raw: bool = True
modifies_raw: bool = True
parallel_safe: bool = True # Can be run in separate parallel workers
# Channel-sequential flags
channel_wise: bool = False # Can be run on a single-channel subset context
run_once: bool = False # In channel_sequential mode: execute only for the
# first channel; subsequent channels are skipped
[docs]
def __init__(self):
"""Initialize processor."""
self._parameters = self._get_parameters()
[docs]
def __call__(self, context: ProcessingContext) -> ProcessingContext:
"""
Make processor callable.
Args:
context: Input processing context
Returns:
Output processing context
"""
return self.execute(context)
[docs]
def execute(self, context: ProcessingContext) -> ProcessingContext:
"""
Execute the processor with validation and history tracking.
Args:
context: Input processing context
Returns:
Output processing context
Raises:
ProcessorValidationError: If validation fails
"""
logger.debug(f"Executing processor: {self.name}")
self.validate(context)
result = self.process(context)
if result is None:
result = context
result.add_history_entry(name=self.name, processor_type=self.__class__.__name__, parameters=self._parameters)
logger.debug(f"Completed processor: {self.name}")
return result
[docs]
def validate(self, context: ProcessingContext) -> None:
"""
Validate that prerequisites are met.
Override this method to add custom validation logic.
Args:
context: Processing context
Raises:
ProcessorValidationError: If validation fails
"""
if self.requires_raw and not context.has_raw():
raise ProcessorValidationError(f"{self.name} requires raw data, but none is available")
if self.requires_triggers and not context.has_triggers():
raise ProcessorValidationError(f"{self.name} requires triggers, but none are available")
[docs]
@abstractmethod
def process(self, context: ProcessingContext) -> ProcessingContext:
"""
Process the context.
This is the main method to implement in subclasses.
Args:
context: Input processing context
Returns:
Output processing context. If None is returned, the input
context is used (no-op behavior).
"""
pass
def _get_parameters(self) -> dict[str, Any]:
"""
Get processor parameters for history tracking.
Override this to customize parameter tracking.
Returns:
Dictionary of parameters
"""
# Get all instance variables that don't start with _
params = {k: v for k, v in self.__dict__.items() if not k.startswith("_") and not callable(v)}
return params
[docs]
def __repr__(self) -> str:
"""String representation."""
params_str = ", ".join(f"{k}={v}" for k, v in self._parameters.items())
return f"{self.__class__.__name__}({params_str})"
[docs]
class SequenceProcessor(Processor):
"""
Processor that executes multiple processors in sequence.
This is a composite processor that allows grouping multiple
processing steps into a single unit.
Example::
preprocessing = SequenceProcessor([
HighPassFilter(freq=1.0),
UpSample(factor=10),
NotchFilter(freqs=[50])
])
"""
name = "sequence"
requires_raw = False # Depends on child processors
[docs]
def __init__(self, processors: list[Processor]):
"""
Initialize sequence processor.
Args:
processors: List of processors to execute in order
"""
self.processors = processors
super().__init__()
[docs]
def validate(self, context: ProcessingContext) -> None:
"""Validate all child processors."""
for processor in self.processors:
processor.validate(context)
[docs]
def process(self, context: ProcessingContext) -> ProcessingContext:
"""Execute all processors in sequence."""
result = context
for processor in self.processors:
result = processor.execute(result)
return result
[docs]
class ConditionalProcessor(Processor):
"""
Processor that executes conditionally based on context.
Example::
ConditionalProcessor(
condition=lambda ctx: ctx.metadata.custom.get("needs_upsampling"),
processor=UpSample(factor=10),
else_processor=None # Skip if condition is False
)
"""
name = "conditional"
requires_raw = False
[docs]
def __init__(self, condition: callable, processor: Processor, else_processor: Processor | None = None):
"""
Initialize conditional processor.
Args:
condition: Callable that takes context and returns bool
processor: Processor to execute if condition is True
else_processor: Processor to execute if condition is False (optional)
"""
self.condition = condition
self.processor = processor
self.else_processor = else_processor
super().__init__()
[docs]
def validate(self, context: ProcessingContext) -> None:
"""Validate based on condition."""
if self.condition(context):
self.processor.validate(context)
elif self.else_processor is not None:
self.else_processor.validate(context)
[docs]
def process(self, context: ProcessingContext) -> ProcessingContext:
"""Execute conditionally."""
if self.condition(context):
logger.debug(f"Condition met, executing {self.processor.name}")
return self.processor.execute(context)
elif self.else_processor is not None:
logger.debug(f"Condition not met, executing {self.else_processor.name}")
return self.else_processor.execute(context)
else:
logger.debug("Condition not met, skipping")
return context
[docs]
class SwitchProcessor(Processor):
"""
Processor that switches between multiple processors based on selector.
Example::
SwitchProcessor(
selector=lambda ctx: "motion" if ctx.has_motion_data() else "aas",
cases={
"aas": AASCorrection(),
"motion": MotionBasedCorrection()
},
default=AASCorrection()
)
"""
name = "switch"
requires_raw = False
[docs]
def __init__(self, selector: callable, cases: dict[str, Processor], default: Processor | None = None):
"""
Initialize switch processor.
Args:
selector: Callable that takes context and returns case key
cases: Dictionary mapping case keys to processors
default: Default processor if selector returns unknown key
"""
self.selector = selector
self.cases = cases
self.default = default
super().__init__()
[docs]
def validate(self, context: ProcessingContext) -> None:
"""Validate selected processor."""
case_key = self.selector(context)
processor = self.cases.get(case_key, self.default)
if processor is None:
raise ProcessorValidationError(f"No processor for case '{case_key}' and no default provided")
processor.validate(context)
[docs]
def process(self, context: ProcessingContext) -> ProcessingContext:
"""Execute selected processor."""
case_key = self.selector(context)
processor = self.cases.get(case_key, self.default)
if processor is None:
raise ProcessorValidationError(f"No processor for case '{case_key}' and no default provided")
logger.debug(f"Selected case '{case_key}', executing {processor.name}")
return processor.execute(context)
[docs]
class NoOpProcessor(Processor):
"""Processor that does nothing (useful for testing or placeholders)."""
name = "noop"
requires_raw = False
modifies_raw = False
[docs]
def process(self, context: ProcessingContext) -> ProcessingContext:
"""Return context unchanged."""
return context
[docs]
class LambdaProcessor(Processor):
"""
Processor that executes a lambda function.
Useful for quick custom operations without creating a full processor class.
Example::
LambdaProcessor(
name="remove_bad_channels",
func=lambda ctx: ctx.with_raw(
ctx.get_raw().copy().drop_channels(["EKG"])
)
)
"""
requires_raw = False
[docs]
def __init__(self, name: str, func: callable):
"""
Initialize lambda processor.
Args:
name: Processor name
func: Function that takes context and returns new context
"""
self.name = name
self.func = func
super().__init__()
[docs]
def process(self, context: ProcessingContext) -> ProcessingContext:
"""Execute lambda function."""
return self.func(context)