Processors Guide
Processors are the building blocks of FACETpy. This guide covers all available processors and how to use them effectively.
What is a Processor?
A processor is a single processing step that:
Takes a
ProcessingContextas inputPerforms one specific operation
Returns a new
ProcessingContextas outputCan validate prerequisites before processing
Tracks its execution in the context history
from facet.preprocessing import HighPassFilter
# Create processor
hpf = HighPassFilter(freq=1.0)
# Execute on context
output_context = hpf.execute(input_context)
Processor Lifecycle
When you call processor.execute(context):
Validation - Checks prerequisites (triggers, raw data, etc.)
Processing - Executes main logic
History - Adds entry to processing history
Return - Returns new context
# Automatic validation
try:
result = processor.execute(context)
except ProcessorValidationError as e:
print(f"Validation failed: {e}")
Live Progress Bars
The modern console can render a dedicated progress bar for any long-running
processor. Import the processor_progress helper and advance it as your work
completes. When the console runs in classic mode the helper becomes a no-op, so
it is safe to call regardless of user preferences.
from facet.console import processor_progress
from facet.core import Processor
class EpochAverager(Processor):
name = "Average epochs"
def execute(self, context):
epochs = list(context.get_epochs())
with processor_progress(total=len(epochs), message="Averaging") as prog:
for epoch in epochs:
self._process_epoch(epoch)
prog.advance(message=f"Processed {prog.current:.0f} epochs")
return context
processor_progress also supports update(current=..., total=...) and
complete() so you can report arbitrary metrics (samples, files, estimated
seconds, etc.). Combine these updates with your own loguru messages for a rich
view of what the processor is doing.
Note
Processors with channel_wise = True (e.g. Filter or
AASCorrection) automatically emit channel-wise progress when the
pipeline runs them in parallel or channel_sequential mode. For
memory-sensitive workloads, channel_sequential=True is usually the
preferred execution mode. For other workloads consider tracking epochs,
files, or optimization iterations; anything you surface through
processor_progress appears in the live console without affecting legacy
logging.
Available Processors
I/O Processors
Loading Data
Loader - Load EEG data with automatic format detection
from facet.io import Loader
loader = Loader(
path="data.edf",
preload=True, # Load into memory
)
context = loader.execute(None)
BIDSLoader - Load BIDS format data
from facet.io import BIDSLoader
loader = BIDSLoader(
root="/path/to/bids",
subject="01",
session="01",
task="rest",
run="01",
)
context = loader.execute(None)
Use run when your BIDS dataset has multiple recordings for the same
subject/session/task combination.
Exporting Data
EDFExporter - Export to EDF format
from facet.io import EDFExporter
exporter = EDFExporter(
path="output.edf",
overwrite=True
)
exporter.execute(context)
BIDSExporter - Export to BIDS format
from facet.io import BIDSExporter
exporter = BIDSExporter(
root="/path/to/bids",
subject="01",
session="01",
task="rest"
)
exporter.execute(context)
Preprocessing Processors
Filtering
HighPassFilter - Remove slow drifts
from facet.preprocessing import HighPassFilter
hpf = HighPassFilter(freq=0.5) # 0.5 Hz cutoff
context = hpf.execute(context)
LowPassFilter - Remove high-frequency noise
from facet.preprocessing import LowPassFilter
lpf = LowPassFilter(freq=100.0) # 100 Hz cutoff
BandPassFilter - Keep specific frequency range
from facet.preprocessing import BandPassFilter
bpf = BandPassFilter(l_freq=0.5, h_freq=100.0)
NotchFilter - Remove specific frequencies (e.g., line noise)
from facet.preprocessing import NotchFilter
notch = NotchFilter(freqs=[50, 100, 150]) # 50 Hz and harmonics
Resampling
UpSample - Increase sampling rate
from facet.preprocessing import UpSample
upsampler = UpSample(factor=10) # 10x upsampling
context = upsampler.execute(context)
DownSample - Decrease sampling rate
from facet.preprocessing import DownSample
downsampler = DownSample(factor=10) # 10x downsampling
Resample - Change to specific sampling rate
from facet.preprocessing import Resample
resampler = Resample(sfreq=1000.0) # Resample to 1000 Hz
Trigger Detection
TriggerDetector - Detect fMRI triggers using regex
from facet.preprocessing import TriggerDetector
detector = TriggerDetector(
regex=r"\b1\b", # Pattern to match
)
context = detector.execute(context)
# Check detected triggers
triggers = context.get_triggers()
print(f"Found {len(triggers)} triggers")
QRSTriggerDetector - Detect R-peaks for cardiac (BCG) artifact correction
from facet.preprocessing import QRSTriggerDetector
detector = QRSTriggerDetector(
save_to_annotations=False # Optionally persist peaks as MNE annotations
)
MissingTriggerDetector - Detect and interpolate missing triggers
from facet.preprocessing import MissingTriggerDetector
detector = MissingTriggerDetector(
expected_tr=2.0, # Expected TR in seconds
tolerance=0.1 # 10% tolerance
)
TriggerExplorer / InteractiveTriggerExplorer - Inspect and QA trigger detection results interactively
from facet.preprocessing import TriggerExplorer
explorer = TriggerExplorer()
context = explorer.execute(context)
Alignment
TriggerAligner - Align triggers using cross-correlation
from facet.preprocessing import TriggerAligner
aligner = TriggerAligner(
ref_trigger_index=0, # Reference trigger
search_window=50 # Maximum lag search window in samples
)
context = aligner.execute(context)
SubsampleAligner - Subsample-precision alignment
from facet.preprocessing import SubsampleAligner
aligner = SubsampleAligner(
ref_trigger_index=0,
search_window=20
)
SliceAligner - Align artifacts slice-by-slice
from facet.preprocessing import SliceAligner
aligner = SliceAligner()
context = aligner.execute(context)
Data Transforms
CutAcquisitionWindow / PasteAcquisitionWindow - Remove and restore the acquisition window around fMRI triggers for cleaner downstream processing.
from facet.preprocessing import CutAcquisitionWindow, PasteAcquisitionWindow
cutter = CutAcquisitionWindow()
paster = PasteAcquisitionWindow()
Crop - Crop the raw recording to a time range
from facet.preprocessing import Crop
crop = Crop(tmin=10.0, tmax=300.0) # Keep 10 s – 300 s
MagicErasor - Interactively erase selected signal segments with multiple modes
from facet.preprocessing import MagicErasor
# Opens an interactive editor and stays open until "Done" is clicked.
# Multiple edits can be applied in one session.
erasor = MagicErasor(
picks="eeg", # channels to edit
default_mode="zero", # zero | mean | interpolate | generated_eeg
random_seed=42, # optional, used for generated_eeg mode
)
context = erasor.execute(context)
MagicErasor precision controls:
View Center (s): move the visible time window.Window (s): zoom in/out horizontally for precise segment selection.Y Zoom: zoom amplitude vertically (default0.5, max3.0).Apply Editapplies the current selection without closing the window.Doneconfirms all edits and returns the updated context.Cancelcloses without applying the session changes.
PickChannels / DropChannels - Select or remove channels
from facet.preprocessing import PickChannels, DropChannels
picker = PickChannels(picks=["Fp1", "Fp2", "F3", "F4"])
dropper = DropChannels(channels=["ECG", "EOG"])
ChannelStandardizer - Convert EEG channels to predefined standards
from facet.preprocessing import ChannelStandardizer
# Convert to classic 10-20 subset and keep auxiliary channels (stim/ecg)
standardizer = ChannelStandardizer("10-20", keep_auxiliary=True)
context = standardizer.execute(context)
# Or use an explicit custom subset
custom = ChannelStandardizer(["Cz", "Pz", "Oz"], on_missing="ignore", keep_auxiliary=False)
context = custom.execute(context)
PrintMetric - Print a context metadata value to the console
from facet.preprocessing import PrintMetric
printer = PrintMetric(key="triggers")
printer.execute(context)
Correction Processors
AASCorrection - Averaged Artifact Subtraction
The main correction algorithm:
from facet.correction import AASCorrection
aas = AASCorrection(
window_size=30, # Sliding window size
correlation_threshold=0.975, # Correlation threshold
realign_after_averaging=True, # Realign to template
)
context = aas.execute(context)
ANCCorrection - Adaptive Noise Cancellation
Removes residual artifacts:
from facet.correction import ANCCorrection
anc = ANCCorrection(
filter_order=5, # Filter order
hp_freq=1.0, # Highpass frequency
use_c_extension=True # Use C implementation if available
)
context = anc.execute(context)
PCACorrection - PCA-based artifact removal
from facet.correction import PCACorrection
pca = PCACorrection(
n_components=0.95, # Keep 95% variance
hp_freq=1.0 # Highpass before PCA
)
context = pca.execute(context)
Evaluation Processors
ReferenceIntervalSelector - Select clean reference data for metrics
from facet.evaluation import ReferenceIntervalSelector
ref_selector = ReferenceIntervalSelector(channel="Fp1")
context = ref_selector.execute(context)
SignalIntervalSelector - Select the evaluated signal interval for metrics
from facet.evaluation import SignalIntervalSelector
sig_selector = SignalIntervalSelector(channel="Fp1")
context = sig_selector.execute(context)
# Access selected interval
eval_interval = context.metadata.custom['evaluation_interval']
SNRCalculator - Calculate Signal-to-Noise Ratio
from facet.evaluation import SNRCalculator
snr_calc = SNRCalculator()
context = snr_calc.execute(context)
# Access results
snr = context.metadata.custom['metrics']['snr']
snr_per_channel = context.metadata.custom['metrics']['snr_per_channel']
RMSCalculator - Calculate RMS ratio
from facet.evaluation import RMSCalculator
rms_calc = RMSCalculator()
context = rms_calc.execute(context)
rms_ratio = context.metadata.custom['metrics']['rms_ratio']
MedianArtifactCalculator - Calculate median artifact amplitude
from facet.evaluation import MedianArtifactCalculator
median_calc = MedianArtifactCalculator()
context = median_calc.execute(context)
median_artifact = context.metadata.custom['metrics']['median_artifact']
RMSResidualCalculator - Calculate RMS residual ratio
from facet.evaluation import RMSResidualCalculator
rms_res = RMSResidualCalculator()
context = rms_res.execute(context)
rms_residual = context.metadata.custom['metrics']['rms_residual']
FFTAllenCalculator - FFT-based quality metric (Allen 2000)
from facet.evaluation import FFTAllenCalculator
allen = FFTAllenCalculator()
context = allen.execute(context)
FFTNiazyCalculator - FFT-based quality metric (Niazy 2005)
from facet.evaluation import FFTNiazyCalculator
niazy = FFTNiazyCalculator()
context = niazy.execute(context)
MetricsReport - Print metrics report
from facet.evaluation import MetricsReport
report = MetricsReport()
report.execute(context)
Composite Processors
SequenceProcessor - Execute processors in sequence
from facet.core import SequenceProcessor
# Group multiple processors
correction_sequence = SequenceProcessor([
AASCorrection(window_size=30),
ANCCorrection(filter_order=5),
PCACorrection(n_components=0.95)
])
context = correction_sequence.execute(context)
ConditionalProcessor - Execute conditionally
from facet.core import ConditionalProcessor
# Only run if condition is met
conditional_anc = ConditionalProcessor(
condition=lambda ctx: ctx.metadata.custom.get('needs_anc', False),
processor=ANCCorrection(filter_order=5),
else_processor=None # Skip if False
)
SwitchProcessor - Switch between processors
from facet.core import SwitchProcessor
# Select processor based on context
adaptive_correction = SwitchProcessor(
selector=lambda ctx: "high_artifact" if ctx.metadata.artifact_length > 100 else "low_artifact",
cases={
"high_artifact": AASCorrection(window_size=50),
"low_artifact": AASCorrection(window_size=20)
},
default=AASCorrection(window_size=30)
)
Processor Requirements
Each processor may require certain data to be present in the context:
Common Requirements
requires_raw- Needs MNE Raw datarequires_triggers- Needs trigger positions
Checking Requirements
# Check processor requirements
print(f"Requires raw: {processor.requires_raw}")
print(f"Requires triggers: {processor.requires_triggers}")
# Check context
if context.has_triggers():
print("Triggers available")
else:
print("No triggers - detector needed")
Processor Properties
Each processor has properties that describe its behavior:
processor = AASCorrection(window_size=30)
# Metadata
print(processor.name) # "aas_correction"
print(processor.description) # "Averaged Artifact Subtraction"
print(processor.version) # "1.0.0"
# Behavior flags
print(processor.requires_raw) # True
print(processor.requires_triggers) # True
print(processor.modifies_raw) # True
print(processor.parallel_safe) # True
Using Processors
Direct Execution
Execute processor directly:
processor = TriggerDetector(regex=r"\b1\b")
output_context = processor.execute(input_context)
In a Pipeline
Add to pipeline:
pipeline = Pipeline([
Loader(path="data.edf"),
TriggerDetector(regex=r"\b1\b"),
AASCorrection(window_size=30)
])
Callable Interface
Processors are callable:
processor = HighPassFilter(freq=1.0)
# These are equivalent
context = processor.execute(input_context)
context = processor(input_context)
Chaining Processors
Chain processor calls with the pipe operator:
from facet import load, TriggerDetector, UpSample, TriggerAligner, AASCorrection
context = (
load("data.edf", preload=True)
| TriggerDetector(regex=r"\b1\b")
| UpSample(factor=10)
| TriggerAligner(ref_trigger_index=0)
| AASCorrection(window_size=30)
)
Processor Discovery
Using the Registry
Get processor by name:
from facet.core import get_processor, list_processors
# Get processor class
ProcessorClass = get_processor("aas_correction")
processor = ProcessorClass(window_size=30)
# List all processors
all_processors = list_processors()
for name, proc_class in all_processors.items():
print(f"{name}: {proc_class.__name__}")
# List by module category
correction_processors = list_processors(category="correction")
preprocessing_processors = list_processors(category="preprocessing")
Best Practices
Validate Context
Check that context has required data:
if not context.has_triggers(): raise ValueError("Triggers required for this processor")
Use Appropriate Parameters
Choose parameters based on your data:
# High artifact amplitude AASCorrection(window_size=50, correlation_threshold=0.98) # Low artifact amplitude AASCorrection(window_size=20, correlation_threshold=0.95)
Check Results
Verify processor output:
context = processor.execute(context) # Check raw data modified if processor.modifies_raw: assert context.get_raw() is not None
Handle Errors
Catch validation errors:
from facet.core import ProcessorValidationError try: context = processor.execute(context) except ProcessorValidationError as e: print(f"Prerequisites not met: {e}")
Log Processing
Track what’s being done:
from loguru import logger logger.info(f"Applying {processor.name}") context = processor.execute(context) logger.info(f"Completed {processor.name}")
Processor Comparison
Correction Algorithms
Algorithm |
Speed |
Effectiveness |
Use Case |
|---|---|---|---|
AAS |
Fast |
High |
Primary |
ANC |
Medium |
Medium-High |
Residual |
PCA |
Slow |
Medium |
Alternative |
Typical workflow: 1. AAS (primary correction) 2. ANC (residual artifacts) 3. PCA (optional, if needed)
Filter Types
Filter |
Purpose |
When to Use |
|---|---|---|
HighPass |
Remove drift |
After correction |
LowPass |
Remove HF noise |
Before analysis |
BandPass |
Frequency range |
Specific analysis |
Notch |
Line noise |
50/60 Hz and harmonics |
Performance Tips
Processor Order
Optimal order:
Load data
Detect triggers
Upsample
Align
Correct (AAS → ANC → PCA)
Downsample
Filter
Evaluate
Export
Parallelization
Enable for multi-channel data:
pipeline.run(parallel=True, n_jobs=-1)
Memory Management
Use preload wisely:
# Small files - preload Loader(path="small.edf", preload=True) # Large files - don't preload Loader(path="large.edf", preload=False)
Next Steps
Learn how to create Custom Processors
Understand Parallel and Channel-Sequential Processing for performance
Explore Pipeline Guide for workflow composition
Check Preprocessing API for detailed API reference