Architecture Overview
FACETpy 2.0 is built on a modular, pipeline-based architecture that provides flexibility, extensibility, and ease of use.
Core Concepts
The architecture consists of four main components:
Processors - Individual processing steps
Context - Data container passed between processors
Pipeline - Workflow orchestrator
Registry - Plugin discovery system
Processors
Processors are the building blocks of FACETpy. Each processor:
Performs a single, well-defined operation
Receives a
ProcessingContextas inputReturns a new
ProcessingContextas outputIs independently testable and reusable
Example Processor
from facet.core import Processor, register_processor
@register_processor
class MyProcessor(Processor):
name = "my_processor"
description = "Does something useful"
def __init__(self, param1, param2=default):
self.param1 = param1
self.param2 = param2
super().__init__()
def validate(self, context):
"""Validate prerequisites before processing."""
super().validate(context)
if not context.has_triggers():
raise ProcessorValidationError("Triggers required")
def process(self, context):
"""Main processing logic."""
raw = context.get_raw()
# Do something with the data
# ...
return context.with_raw(modified_raw)
Processor Lifecycle
When processor.execute(context) is called:
Validate - Check prerequisites
Process - Execute main logic
Record - Add history entry
Return - Return new context
result_context = processor.execute(input_context)
Processing Context
The ProcessingContext is a container that holds:
Raw Data - MNE Raw object with EEG data
Metadata - Triggers, artifact info, parameters
Estimated Noise - Accumulated artifact estimates
Processing History - Record of all operations
Context is Immutable
Context follows an immutable-by-default pattern:
# Creating new contexts
context1 = ProcessingContext(raw=raw, metadata=metadata)
context2 = context1.with_raw(new_raw) # context1 unchanged
context3 = context2.with_metadata(new_metadata) # context2 unchanged
This prevents accidental modifications and makes debugging easier.
Accessing Data
# Get data
raw = context.get_raw() # Current processed data
raw_orig = context.get_raw_original() # Original data
triggers = context.get_triggers() # Trigger positions
noise = context.get_estimated_noise() # Artifact estimates
# Check availability
if context.has_triggers():
triggers = context.get_triggers()
if context.has_estimated_noise():
noise = context.get_estimated_noise()
Processing Metadata
Metadata tracks processing parameters:
metadata = context.metadata
# Standard fields
triggers = metadata.triggers
artifact_length = metadata.artifact_length
upsampling_factor = metadata.upsampling_factor
# Custom data
metadata.custom['my_key'] = my_value
my_value = metadata.custom.get('my_key')
Pipeline
Pipeline orchestrates processor execution:
from facet.core import Pipeline
pipeline = Pipeline([
processor1,
processor2,
processor3
], name="My Pipeline")
result = pipeline.run()
Pipeline Features
Sequential Execution
result = pipeline.run() # Runs processors in order
Channel-Sequential Execution (Memory-optimized)
# Recommended for long recordings / upsampling-heavy pipelines
result = pipeline.run(channel_sequential=True)
Parallel Execution
# Throughput-oriented alternative when sufficient RAM is available
result = pipeline.run(parallel=True, n_jobs=-1)
Initial Context
initial_context = ProcessingContext(raw=raw)
result = pipeline.run(initial_context=initial_context)
Error Handling
result = pipeline.run()
if result.success:
final_context = result.context
print(f"Completed in {result.execution_time:.2f}s")
else:
print(f"Failed at: {result.failed_processor}")
print(f"Error: {result.error}")
Composite Processors
Build complex workflows with composite processors:
from facet.core import SequenceProcessor, ConditionalProcessor
# Run sequence of processors
correction_sequence = SequenceProcessor([
AASCorrection(window_size=30),
ANCCorrection()
])
# Conditional execution
conditional_pca = ConditionalProcessor(
condition=lambda ctx: ctx.metadata.custom.get('needs_pca', False),
processor=PCACorrection(n_components=0.95)
)
Registry
The registry provides plugin discovery and management.
Registration
Register processors with a decorator:
from facet.core import register_processor
@register_processor
class MyProcessor(Processor):
name = "my_processor" # Unique identifier
Discovery
from facet.core import get_processor, list_processors
# Get processor class by name
ProcessorClass = get_processor("aas_correction")
processor = ProcessorClass(window_size=30)
# List all registered processors
all_processors = list_processors()
for name, proc_class in all_processors.items():
print(f"{name}: {proc_class.__name__}")
Parallel Execution
FACETpy supports two types of parallelization:
Pipeline-Level Parallelization
Execute multiple pipelines concurrently:
import concurrent.futures
def run_pipeline(pipeline):
return pipeline.run()
pipelines = [create_pipeline(file) for file in files]
with concurrent.futures.ProcessPoolExecutor() as executor:
results = list(executor.map(run_pipeline, pipelines))
Processor-Level Parallelization
Processors marked as parallel_safe can parallelize internally:
@register_processor
class MyProcessor(Processor):
parallel_safe = True # Can run in separate worker processes
channel_wise = True # Can operate on a single-channel subset
def process(self, context):
# This will run in parallel when pipeline.run(parallel=True)
...
Channel-Wise Parallelization
The ParallelExecutor automatically splits by channels:
pipeline = Pipeline([
Loader(path="data.edf"),
TriggerDetector(regex=r"\b1\b"),
AASCorrection(window_size=30) # Will parallelize by channel
])
result = pipeline.run(parallel=True, n_jobs=-1)
Data Flow
Typical data flow through FACETpy:
Each arrow represents a ProcessingContext being passed between processors.
Design Principles
Single Responsibility Each processor does one thing well
Immutability Contexts are not modified in-place
Composability Processors can be combined in any order
Explicit over Implicit Clear validation and error messages
MNE Integration First-class support for MNE objects
Extensibility Easy to add custom processors
Testability Each component independently testable
Benefits
This architecture provides:
Flexibility - Build any workflow
Reusability - Share processors across projects
Maintainability - Clear separation of concerns
Debuggability - Track data flow with history
Performance - Built-in parallelization
Extensibility - Plugin system for custom needs