Core API
The core module provides the fundamental building blocks of FACETpy’s architecture.
Base Classes
Processor
- class facet.core.Processor[source]
Bases:
ABCBase class for all processors in the pipeline.
Processors are the fundamental building blocks of FACETpy pipelines. Each processor: - Takes a ProcessingContext as input - Performs a specific operation - Returns a new ProcessingContext (immutable by default) - Can validate prerequisites before processing - Tracks its execution in the context history
Subclasses must implement: - process(): Main processing logic - validate(): Check prerequisites (optional)
Example:
class HighPassFilter(Processor): name = "highpass_filter" def __init__(self, freq: float): self.freq = freq def validate(self, context: ProcessingContext) -> None: if not context.has_raw(): raise ProcessorValidationError("No raw data available") def process(self, context: ProcessingContext) -> ProcessingContext: raw = context.get_raw().copy() raw.filter(l_freq=self.freq, h_freq=None) return context.with_raw(raw)
- __call__(context: ProcessingContext) ProcessingContext[source]
Make processor callable.
- Parameters:
context – Input processing context
- Returns:
Output processing context
- execute(context: ProcessingContext) ProcessingContext[source]
Execute the processor with validation and history tracking.
- Parameters:
context – Input processing context
- Returns:
Output processing context
- Raises:
ProcessorValidationError – If validation fails
- validate(context: ProcessingContext) None[source]
Validate that prerequisites are met.
Override this method to add custom validation logic.
- Parameters:
context – Processing context
- Raises:
ProcessorValidationError – If validation fails
- abstract process(context: ProcessingContext) ProcessingContext[source]
Process the context.
This is the main method to implement in subclasses.
- Parameters:
context – Input processing context
- Returns:
Output processing context. If None is returned, the input context is used (no-op behavior).
ProcessingContext
- class facet.core.ProcessingContext(raw: Raw, raw_original: Raw | None = None, metadata: ProcessingMetadata | None = None)[source]
Bases:
objectProcessing context that wraps MNE Raw objects and provides metadata.
This class serves as the primary data container passed between processors. It provides: - Access to current and original MNE Raw objects - Metadata storage (triggers, parameters, etc.) - Processing history tracking - Estimated noise tracking - Immutable operations (processors return new contexts)
- _raw
Current processed MNE Raw object
- _raw_original
Original unprocessed MNE Raw object
- _metadata
Processing metadata
- _history
List of processing steps
- _estimated_noise
Accumulated noise estimates
- _cache
Cache for computed values
- __init__(raw: Raw, raw_original: Raw | None = None, metadata: ProcessingMetadata | None = None)[source]
Initialize processing context.
- Parameters:
raw – MNE Raw object
raw_original – Original Raw object (if None, copies raw)
metadata – Processing metadata (if None, creates empty)
- get_data(picks: Any | None = None, **kwargs) ndarray[source]
Get data from Raw object.
- Parameters:
picks – Channel picks (MNE format)
**kwargs – Additional arguments for Raw.get_data(copy=False)
- Returns:
Data array
- property metadata: ProcessingMetadata
Get processing metadata.
- get_metric(name: str, default=None)[source]
Return a single evaluation metric stored in the context.
Shortcut for the common pattern:
ctx.metadata.custom.get('metrics', {}).get(name, default)
Typically used inside a
ConditionalProcessorcondition function after an evaluation step has run.- Parameters:
name – Metric name (e.g.
'snr','rms_ratio').default – Value returned when the metric is absent.
Example:
def needs_extra_correction(ctx): return ctx.get_metric('snr', float('inf')) < 10
- get_history() list[ProcessingStep][source]
Get processing history.
- add_history_entry(name: str | None = None, processor_type: str = '', parameters: dict[str, Any] | None = None, *, processor_name: str | None = None) None[source]
Add entry to processing history.
- with_raw(raw: Raw, copy_metadata: bool = True) ProcessingContext[source]
Create new context with updated Raw object.
- Parameters:
raw – New Raw object
copy_metadata – Whether to copy metadata from current context
- Returns:
New ProcessingContext
- with_metadata(metadata: ProcessingMetadata) ProcessingContext[source]
Create new context with updated metadata.
- Parameters:
metadata – New metadata
- Returns:
New ProcessingContext
- with_triggers(triggers: ndarray) ProcessingContext[source]
Create new context with updated triggers.
- Parameters:
triggers – New trigger array
- Returns:
New ProcessingContext
- with_trigger_samples(triggers: ndarray | list[int], *, artifact_length: int | None = None, tr_seconds: float | None = None, trigger_regex: str | None = None, custom: dict[str, Any] | None = None, samples_are_absolute: bool = False) ProcessingContext[source]
Return a new context with trigger samples and optional artifact metadata.
- Parameters:
triggers (
typing.Iterableofint) – Trigger sample positions.artifact_length (
int, optional) – Artifact window length in samples. If omitted (andtr_secondsis also omitted), FACETpy infers it as the median spacing between trigger samples when at least two triggers are available.tr_seconds (
float, optional) – Artifact window length in seconds. Converted usingraw.info['sfreq'].trigger_regex (
str, optional) – Label/pattern stored in metadata for provenance.custom (
dict, optional) – Additionalmetadata.customentries to merge in.samples_are_absolute (
bool, optional) – IfTrue, trigger samples are interpreted as absolute MNE sample indices and converted to context-relative indices by subtractingraw.first_samp.
- with_mne_events(events: ndarray, *, event: int | str | None = None, event_id: dict[str, int] | None = None, artifact_length: int | None = None, tr_seconds: float | None = None, store_event_id: bool = True) ProcessingContext[source]
Build trigger metadata from MNE events and return a new context.
- Parameters:
events (
numpy.ndarray) – MNE-style event array with shape(n_events, 3).event (
intorstr, optional) – Event code or event name to select. IfNone, events must contain exactly one unique code.event_id (
dict, optional) – Mapping from event names to integer codes (frommne.events_from_annotations). Required wheneventis a str.artifact_length (
int, optional) – Artifact length in samples.tr_seconds (
float, optional) – Artifact length in seconds. Takes precedence over default inference, but cannot be combined withartifact_length.store_event_id (
bool, optional) – IfTrueandevent_idis provided, stores it inmetadata.custom['event_id'].
- copy(deep: bool = False) ProcessingContext[source]
Create a copy of the context.
- Parameters:
deep – If True, deep copy Raw object (expensive)
- Returns:
Copied ProcessingContext
- to_dict() dict[str, Any][source]
Serialize context to dictionary (for multiprocessing).
- Returns:
Dictionary representation
- classmethod from_dict(data: dict[str, Any]) ProcessingContext[source]
Deserialize context from dictionary.
- Parameters:
data – Dictionary representation
- Returns:
ProcessingContext instance
- __or__(other) ProcessingContext[source]
Apply a processor or callable to this context using the pipe operator.
Enables a clean chaining syntax outside of a
Pipeline:ctx = ProcessingContext(raw) result = ( ctx | HighPassFilter(1.0) | UpSample(10) | TriggerDetector(r"\b1\b") | AASCorrection() ) filtered_raw = result.get_raw()
- Parameters:
other – A
Processorinstance or anyCallable[[ProcessingContext], ProcessingContext].- Returns:
New
ProcessingContextproduced by applying other.
ProcessingStep
ProcessingMetadata
- class facet.core.ProcessingMetadata(triggers: ~numpy.ndarray | None = None, trigger_regex: str | None = None, artifact_to_trigger_offset: float = 0.0, acq_start_sample: int | None = None, acq_end_sample: int | None = None, pre_trigger_samples: int | None = None, post_trigger_samples: int | None = None, upsampling_factor: int = 10, artifact_length: int | None = None, slices_per_volume: int | None = None, volume_gaps: bool = False, custom: dict[str, ~typing.Any] = <factory>)[source]
Bases:
objectMetadata associated with processing context.
- copy() ProcessingMetadata[source]
Create a deep copy of metadata.
- classmethod from_dict(data: dict[str, Any]) ProcessingMetadata[source]
Deserialize metadata from dictionary.
- __init__(triggers: ~numpy.ndarray | None = None, trigger_regex: str | None = None, artifact_to_trigger_offset: float = 0.0, acq_start_sample: int | None = None, acq_end_sample: int | None = None, pre_trigger_samples: int | None = None, post_trigger_samples: int | None = None, upsampling_factor: int = 10, artifact_length: int | None = None, slices_per_volume: int | None = None, volume_gaps: bool = False, custom: dict[str, ~typing.Any] = <factory>) None
Pipeline
Pipeline
- class facet.core.Pipeline(processors: list[Processor | Callable], name: str | None = None)[source]
Bases:
objectPipeline for executing sequences of processors.
A pipeline orchestrates the execution of multiple processors in sequence, handles errors, provides progress tracking, and supports parallelization.
Example:
pipeline = Pipeline([ Loader("data.edf"), HighPassFilter(freq=1.0), UpSample(factor=10), TriggerDetector(regex=r"\btrigger\b"), AASCorrection(), EDFExporter("output.edf") ]) result = pipeline.run() if result.was_successful(): print(f"Completed in {result.execution_time:.2f}s")
- processors
List of processors to execute
- name
Optional pipeline name
- __init__(processors: list[Processor | Callable], name: str | None = None)[source]
Initialize pipeline.
Plain callables (
Callable[[ProcessingContext], ProcessingContext]) are automatically wrapped in aLambdaProcessorso they can be used as inline steps without ceremony:pipeline = Pipeline([ Loader("data.edf"), HighPassFilter(1.0), lambda ctx: (print(ctx.get_sfreq()) or ctx), AASCorrection(), ])
- Parameters:
processors – List of
Processorinstances or plain callables to execute in order.name – Optional pipeline name (for logging)
- run(initial_context: ProcessingContext | None = None, parallel: bool = False, n_jobs: int = -1, channel_sequential: bool = False, show_progress: bool = True) PipelineResult[source]
Execute the pipeline.
- Parameters:
initial_context – Initial context (if None, first processor creates it)
parallel – Enable parallel execution for compatible processors
n_jobs – Number of parallel jobs (-1 for all CPUs)
channel_sequential –
Run consecutive channel-wise processors (
channel_wise = True) as a single per-channel pass. For each channel the full local sequence executes before the next channel starts:for each channel: channel → HP-filter → UpSample → AAS → DownSample → storeThe output array is pre-allocated at the final sfreq so the full high-sfreq intermediate data never exists for all channels at once.
Processors with
run_once = True(e.g. TriggerAligner) are included in the per-channel pass but only execute for the first channel; all subsequent channels skip them.This flag is independent of
parallel_safeand has no relation to multiprocessing. Takes precedence over parallel for eligible processors.show_progress – Show progress bar
- Returns:
PipelineResult containing final context and metadata
- add(processor: Processor | Callable) Pipeline[source]
Add a processor or callable to the pipeline (fluent API).
- Parameters:
processor –
Processorinstance or callable.- Returns:
Self for chaining
- extend(processors: list[Processor | Callable]) Pipeline[source]
Extend pipeline with multiple processors or callables.
- Parameters:
processors – List of processors or callables to add.
- Returns:
Self for chaining
- insert(index: int, processor: Processor | Callable) Pipeline[source]
Insert a processor or callable at a specific position.
- Parameters:
index – Position to insert
processor –
Processorinstance or callable.
- Returns:
Self for chaining
- remove(index: int) Pipeline[source]
Remove processor at index.
- Parameters:
index – Index to remove
- Returns:
Self for chaining
- validate_all(context: ProcessingContext) list[str][source]
Validate all processors against a context.
Useful for checking if a pipeline can run before actually running it.
- Parameters:
context – Context to validate against
- Returns:
List of validation error messages (empty if all valid)
- describe() str[source]
Get human-readable pipeline description.
- Returns:
Multi-line string describing pipeline
- to_dict() dict[str, Any][source]
Serialize pipeline to dictionary.
- Returns:
Dictionary representation
- map(inputs: list[str | ProcessingContext], loader_factory: Callable[[str], Processor] | None = None, parallel: bool = False, n_jobs: int = -1, on_error: str = 'continue', keep_raw: bool = True) BatchResult[source]
Run the pipeline on multiple inputs and return a result per input.
Each input can be:
A
ProcessingContext— passed directly asinitial_context.A file path string — a fresh
Loaderis created automatically for each path via loader_factory.
Note
Do not add a
Loaderprocessor to the pipeline when usingmap(). Loading is handled outside the pipeline so that each file gets its own isolated loader instance.- Parameters:
inputs – List of file paths or
ProcessingContextobjects.loader_factory –
Callable[[path], Processor]that creates a fresh loader for each path string. Defaults tolambda p: Loader(path=p, preload=True).parallel – Whether to pass
parallel=Trueto eachpipeline.run().n_jobs – Passed through to
pipeline.run().on_error –
"continue"(default) — log failures and keep going;"raise"— re-raise the first error encountered.keep_raw – If
False, the Raw data is released from each result after the pipeline run completes, keeping only metrics and history in memory. Set toFalsewhen processing many files and you only need summary statistics. Defaults toTrue.
- Returns:
BatchResultcontaining onePipelineResultper input, in the same order. It behaves like a plain list but also offersBatchResult.print_summary()andBatchResult.summary_df.
Example:
pipeline = Pipeline([ TriggerDetector(regex=r"\b1\b"), UpSample(factor=10), AASCorrection(window_size=30), DownSample(factor=10), SNRCalculator(), ]) results = pipeline.map( ["sub-01.edf", "sub-02.edf", "sub-03.edf"], keep_raw=False, ) results.print_summary()
PipelineResult
- class facet.core.PipelineResult(context: ProcessingContext | None, success: bool = True, error: Exception | None = None, execution_time: float = 0.0, failed_processor: str | None = None, failed_processor_index: int | None = None)[source]
Bases:
objectResult of pipeline execution.
Contains the final context and metadata about execution.
- __init__(context: ProcessingContext | None, success: bool = True, error: Exception | None = None, execution_time: float = 0.0, failed_processor: str | None = None, failed_processor_index: int | None = None)[source]
Initialize pipeline result.
- Parameters:
context – Final processing context
success – Whether pipeline completed successfully
error – Exception if pipeline failed
execution_time – Total execution time in seconds
- get_context() ProcessingContext[source]
Get final processing context.
- property metrics: dict[str, Any]
Shortcut to evaluation metrics stored in context.
Returns the
metricsdict fromcontext.metadata.custom, or an empty dict if no metrics have been calculated yet.Example:
result = pipeline.run() print(result.metrics['snr'])
- property metrics_df
Return scalar evaluation metrics as a
pandas.Series.Nested dicts (e.g.
fft_allen) are flattened with_separators. ReturnsNoneif pandas is not available.Example:
result = pipeline.run() print(result.metrics_df)
- metric(name: str, default=None)[source]
Return a single evaluation metric by name.
Shortcut for
result.metrics.get(name, default)that avoids having to remember the dict key and provides a clean default.- Parameters:
name – Metric name (e.g.
'snr','rms_ratio').default – Value returned when the metric is absent.
Example:
snr = result.metric('snr') print(f"SNR = {snr:.2f} dB")
- print_metrics() None[source]
Print a formatted table of all evaluation metrics.
Uses rich for colour and alignment when available.
Example:
result = pipeline.run() result.print_metrics()
- print_summary() None[source]
Print a one-line summary of the pipeline result.
Shows success/failure, execution time, and any key metrics (SNR, RMS ratio, RMS residual) that were calculated.
Example:
result = pipeline.run() result.print_summary()
BatchResult
- class facet.core.BatchResult(results: list[PipelineResult], labels: list[str] | None = None)[source]
Bases:
objectResult of
Pipeline.map(...)- a list ofPipelineResultobjects with built-in helpers for quick inspection.It behaves like a regular list (iteration, indexing,
len), so existing code that iterates over the return value ofPipeline.map()continues to work without changes.Example:
results = pipeline.map(files, loader_factory=lambda p: Loader(p)) results.print_summary() df = results.summary_df
- print_summary() None[source]
Print a formatted table with one row per input file.
Columns include the file label, success/failure status, execution time, and any scalar metrics that were computed.
Example:
results = pipeline.map(files, loader_factory=...) results.print_summary()
- property summary_df
Return a
pandas.DataFramewith one row per input.Columns:
file,success,execution_time, plus one column per scalar metric. ReturnsNonewhen pandas is not installed.
PipelineBuilder
- class facet.core.PipelineBuilder(name: str | None = None)[source]
Bases:
objectFluent builder for constructing pipelines.
Example:
pipeline = (PipelineBuilder() .add(Loader("data.edf")) .highpass(1.0) .upsample(10) .detect_triggers(r"\btrigger\b") .aas_correction() .export_edf("output.edf") .build())
- __init__(name: str | None = None)[source]
Initialize builder.
- Parameters:
name – Optional pipeline name
- add(processor: Processor) PipelineBuilder[source]
Add custom processor.
- Parameters:
processor – Processor to add
- Returns:
Self for chaining
- add_if(condition: bool, processor: Processor) PipelineBuilder[source]
Add processor conditionally.
- Parameters:
condition – Whether to add processor
processor – Processor to add
- Returns:
Self for chaining
Composite Processors
SequenceProcessor
- class facet.core.SequenceProcessor(processors: list[Processor])[source]
Bases:
ProcessorProcessor that executes multiple processors in sequence.
This is a composite processor that allows grouping multiple processing steps into a single unit.
Example:
preprocessing = SequenceProcessor([ HighPassFilter(freq=1.0), UpSample(factor=10), NotchFilter(freqs=[50]) ])
- __init__(processors: list[Processor])[source]
Initialize sequence processor.
- Parameters:
processors – List of processors to execute in order
- validate(context: ProcessingContext) None[source]
Validate all child processors.
- process(context: ProcessingContext) ProcessingContext[source]
Execute all processors in sequence.
ConditionalProcessor
- class facet.core.ConditionalProcessor(condition: callable, processor: Processor, else_processor: Processor | None = None)[source]
Bases:
ProcessorProcessor that executes conditionally based on context.
Example:
ConditionalProcessor( condition=lambda ctx: ctx.metadata.custom.get("needs_upsampling"), processor=UpSample(factor=10), else_processor=None # Skip if condition is False )
- __init__(condition: callable, processor: Processor, else_processor: Processor | None = None)[source]
Initialize conditional processor.
- Parameters:
condition – Callable that takes context and returns bool
processor – Processor to execute if condition is True
else_processor – Processor to execute if condition is False (optional)
- validate(context: ProcessingContext) None[source]
Validate based on condition.
- process(context: ProcessingContext) ProcessingContext[source]
Execute conditionally.
SwitchProcessor
- class facet.core.SwitchProcessor(selector: callable, cases: dict[str, Processor], default: Processor | None = None)[source]
Bases:
ProcessorProcessor that switches between multiple processors based on selector.
Example:
SwitchProcessor( selector=lambda ctx: "motion" if ctx.has_motion_data() else "aas", cases={ "aas": AASCorrection(), "motion": MotionBasedCorrection() }, default=AASCorrection() )
- __init__(selector: callable, cases: dict[str, Processor], default: Processor | None = None)[source]
Initialize switch processor.
- Parameters:
selector – Callable that takes context and returns case key
cases – Dictionary mapping case keys to processors
default – Default processor if selector returns unknown key
- validate(context: ProcessingContext) None[source]
Validate selected processor.
- process(context: ProcessingContext) ProcessingContext[source]
Execute selected processor.
NoOpProcessor
LambdaProcessor
- class facet.core.LambdaProcessor(name: str, func: callable)[source]
Bases:
ProcessorProcessor that executes a lambda function.
Useful for quick custom operations without creating a full processor class.
Example:
LambdaProcessor( name="remove_bad_channels", func=lambda ctx: ctx.with_raw( ctx.get_raw().copy().drop_channels(["EKG"]) ) )
- __init__(name: str, func: callable)[source]
Initialize lambda processor.
- Parameters:
name – Processor name
func – Function that takes context and returns new context
- process(context: ProcessingContext) ProcessingContext[source]
Execute lambda function.
Registry
- facet.core.register_processor(processor_class: type[Processor] | None = None, name: str | None = None, force: bool = False)[source]
Decorator to register a processor.
Can be used with or without arguments.
Example:
@register_processor class MyProcessor(Processor): name = "my_processor" ... @register_processor(name="custom_name") class MyProcessor(Processor): ...
- Parameters:
processor_class – Processor class (when used without arguments)
name – Custom name (overrides class.name)
force – Force registration even if name exists
- Returns:
Decorator function or processor class
Parallel Execution
ParallelExecutor
- class facet.core.ParallelExecutor(n_jobs: int = -1, backend: str = 'multiprocessing', verbose: bool = True)[source]
Bases:
objectExecutor for parallel processing of channels or epochs.
This class handles multiprocessing for processors that support it, typically for channel-wise or epoch-wise operations.
Example
executor = ParallelExecutor(n_jobs=4) result_context = executor.execute(processor, context)
- __init__(n_jobs: int = -1, backend: str = 'multiprocessing', verbose: bool = True)[source]
Initialize parallel executor.
- Parameters:
n_jobs – Number of parallel jobs (-1 for all CPUs, -2 for all but one)
backend – Parallel backend (“multiprocessing”, “threading”, or “serial”)
verbose – Show progress messages
- execute(processor: Processor, context: ProcessingContext) ProcessingContext[source]
Execute processor in parallel if possible.
This method attempts to parallelize the processor execution. If parallelization is not applicable, falls back to serial execution.
- Parameters:
processor – Processor to execute
context – Input context
- Returns:
Output context
ChannelSequentialExecutor
- class facet.core.ChannelSequentialExecutor[source]
Bases:
objectExecute a sequence of processors one channel at a time.
For each data channel the full processor sequence runs to completion before the next channel starts. This ensures that large intermediate representations (e.g. 10x upsampled data) only ever exist for a single channel simultaneously.
Processors with
run_once = Trueexecute only for the first channel; all subsequent channels skip them and inherit the metadata produced by that first run.Non-data channels (stim, misc, …) are handled separately: copied unchanged when the sampling rate stays the same, or resampled via MNE when the batch changes the sampling rate.
The live console (when active) receives fine-grained channel and processor progress updates via the modern console manager.
Example:
executor = ChannelSequentialExecutor() result = executor.execute( [HighPassFilter(1.0), UpSample(10), AASCorrection(), DownSample(10)], context, )
- execute(processors: list[Processor], context: ProcessingContext) ProcessingContext[source]
Run processors on context one channel at a time.
- Parameters:
processors (
listofProcessor) – Processors to execute in order for every channel.context (
ProcessingContext) – Input context containing the full multi-channel dataset.
- Returns:
Merged output context with all channels processed.
- Return type:
Exceptions
- class facet.core.ProcessorError[source]
Bases:
ExceptionBase exception for all processor-related errors.
- class facet.core.ProcessorValidationError[source]
Bases:
ProcessorErrorRaised when processor validation fails.