Top-Level API
The facet package re-exports the most commonly used classes and helper
functions.
Convenience Functions
- facet.load(path: str, **kwargs) ProcessingContext[source]
Load an EEG file and return a
ProcessingContext.Automatically detects the file format from the extension (EDF, BDF, GDF, BrainVision, EEGLAB, FIF). For BIDS datasets, use
BIDSLoader.- Parameters:
- Returns:
Context containing the loaded recording.
- Return type:
Examples
>>> import facet >>> ctx = facet.load("./data/subject01.edf", preload=True) >>> ctx = ctx | facet.HighPassFilter(1.0) | facet.TriggerDetector(r"\b1\b")
- facet.export(context: ProcessingContext, path: str, **kwargs) ProcessingContext[source]
Export a
ProcessingContextusing extension-based auto routing.Uses
Exporterto dispatch export handling frompathextension (for example.edf,.set,.vhdr).- Parameters:
context (
facet.core.ProcessingContext) – Input context that contains the Raw data to export.path (
str) – Destination path; extension selects the exporter implementation.**kwargs – Additional keyword arguments forwarded to
Exporter(for exampleoverwrite).
- Returns:
The unchanged input context after export.
- Return type:
Examples
>>> import facet >>> ctx = facet.load("./data/subject01.edf", preload=True) >>> ctx = facet.export(ctx, "./data/subject01_corrected.set", overwrite=True)
- facet.create_standard_pipeline(input_path: str, output_path: str, start_sample: int = 0, end_sample: int = None, trigger_regex: str = '\\b1\\b', artifact_to_trigger_offset: float = -0.005, upsample_factor: int = 10, use_anc: bool = True, use_pca: bool = True, evaluate: bool = False, plot: bool = False, plot_kwargs: dict | None = None) Pipeline[source]
Create a standard fMRI artifact correction pipeline.
- Parameters:
input_path – Path to input EEG file
output_path – Path to output corrected file
trigger_regex – Regex pattern for trigger detection
artifact_to_trigger_offset – Time offset between artifact and trigger (seconds)
upsample_factor – Upsampling factor for alignment
use_anc – Whether to apply ANC correction
use_pca – Whether to apply PCA correction
evaluate – Append all standard evaluation metrics (SNR, RMS, Median, FFT) and a MetricsReport step automatically.
plot – Append a RawPlotter step after evaluation. Implies evaluate=True.
plot_kwargs – Extra keyword arguments forwarded to
RawPlotter.
- Returns:
Configured Pipeline instance
Example:
# Minimal — just correction + export pipeline = create_standard_pipeline("data.edf", "corrected.edf") result = pipeline.run() # With automatic evaluation pipeline = create_standard_pipeline( "data.edf", "corrected.edf", evaluate=True, ) result = pipeline.run() print(result.metrics) # With evaluation and interactive plot pipeline = create_standard_pipeline( "data.edf", "corrected.edf", evaluate=True, plot=True, plot_kwargs={"channel": "Fp1", "start": 5.0, "duration": 10.0}, ) result = pipeline.run()
Configuration
- facet.get_config(key: str | None = None) Any[source]
Return the resolved FACETpy configuration.
- Parameters:
key (
str, optional) – Optional key to retrieve. IfNone, the full config dict is returned.
- facet.set_config(config: Mapping[str, Any] | None = None, /, *, apply_logging: bool = True, **kwargs) dict[str, Any][source]
Set in-process runtime config overrides (highest precedence).
- Parameters:
config (
mapping, optional) – Optional mapping with config keys/values.apply_logging (
bool) – Reconfigure FACETpy logging immediately after applying overrides.**kwargs – Additional key/value pairs (merged with
config).
Commonly Used Re-Exports
Core
- class facet.Pipeline(processors: list[Processor | Callable], name: str | None = None)[source]
Bases:
objectPipeline for executing sequences of processors.
A pipeline orchestrates the execution of multiple processors in sequence, handles errors, provides progress tracking, and supports parallelization.
Example:
pipeline = Pipeline([ Loader("data.edf"), HighPassFilter(freq=1.0), UpSample(factor=10), TriggerDetector(regex=r"\btrigger\b"), AASCorrection(), EDFExporter("output.edf") ]) result = pipeline.run() if result.was_successful(): print(f"Completed in {result.execution_time:.2f}s")
- processors
List of processors to execute
- name
Optional pipeline name
- __init__(processors: list[Processor | Callable], name: str | None = None)[source]
Initialize pipeline.
Plain callables (
Callable[[ProcessingContext], ProcessingContext]) are automatically wrapped in aLambdaProcessorso they can be used as inline steps without ceremony:pipeline = Pipeline([ Loader("data.edf"), HighPassFilter(1.0), lambda ctx: (print(ctx.get_sfreq()) or ctx), AASCorrection(), ])
- Parameters:
processors – List of
Processorinstances or plain callables to execute in order.name – Optional pipeline name (for logging)
- run(initial_context: ProcessingContext | None = None, parallel: bool = False, n_jobs: int = -1, channel_sequential: bool = False, show_progress: bool = True) PipelineResult[source]
Execute the pipeline.
- Parameters:
initial_context – Initial context (if None, first processor creates it)
parallel – Enable parallel execution for compatible processors
n_jobs – Number of parallel jobs (-1 for all CPUs)
channel_sequential –
Run consecutive channel-wise processors (
channel_wise = True) as a single per-channel pass. For each channel the full local sequence executes before the next channel starts:for each channel: channel → HP-filter → UpSample → AAS → DownSample → storeThe output array is pre-allocated at the final sfreq so the full high-sfreq intermediate data never exists for all channels at once.
Processors with
run_once = True(e.g. TriggerAligner) are included in the per-channel pass but only execute for the first channel; all subsequent channels skip them.This flag is independent of
parallel_safeand has no relation to multiprocessing. Takes precedence over parallel for eligible processors.show_progress – Show progress bar
- Returns:
PipelineResult containing final context and metadata
- add(processor: Processor | Callable) Pipeline[source]
Add a processor or callable to the pipeline (fluent API).
- Parameters:
processor –
Processorinstance or callable.- Returns:
Self for chaining
- extend(processors: list[Processor | Callable]) Pipeline[source]
Extend pipeline with multiple processors or callables.
- Parameters:
processors – List of processors or callables to add.
- Returns:
Self for chaining
- insert(index: int, processor: Processor | Callable) Pipeline[source]
Insert a processor or callable at a specific position.
- Parameters:
index – Position to insert
processor –
Processorinstance or callable.
- Returns:
Self for chaining
- remove(index: int) Pipeline[source]
Remove processor at index.
- Parameters:
index – Index to remove
- Returns:
Self for chaining
- validate_all(context: ProcessingContext) list[str][source]
Validate all processors against a context.
Useful for checking if a pipeline can run before actually running it.
- Parameters:
context – Context to validate against
- Returns:
List of validation error messages (empty if all valid)
- describe() str[source]
Get human-readable pipeline description.
- Returns:
Multi-line string describing pipeline
- to_dict() dict[str, Any][source]
Serialize pipeline to dictionary.
- Returns:
Dictionary representation
- map(inputs: list[str | ProcessingContext], loader_factory: Callable[[str], Processor] | None = None, parallel: bool = False, n_jobs: int = -1, on_error: str = 'continue', keep_raw: bool = True) BatchResult[source]
Run the pipeline on multiple inputs and return a result per input.
Each input can be:
A
ProcessingContext— passed directly asinitial_context.A file path string — a fresh
Loaderis created automatically for each path via loader_factory.
Note
Do not add a
Loaderprocessor to the pipeline when usingmap(). Loading is handled outside the pipeline so that each file gets its own isolated loader instance.- Parameters:
inputs – List of file paths or
ProcessingContextobjects.loader_factory –
Callable[[path], Processor]that creates a fresh loader for each path string. Defaults tolambda p: Loader(path=p, preload=True).parallel – Whether to pass
parallel=Trueto eachpipeline.run().n_jobs – Passed through to
pipeline.run().on_error –
"continue"(default) — log failures and keep going;"raise"— re-raise the first error encountered.keep_raw – If
False, the Raw data is released from each result after the pipeline run completes, keeping only metrics and history in memory. Set toFalsewhen processing many files and you only need summary statistics. Defaults toTrue.
- Returns:
BatchResultcontaining onePipelineResultper input, in the same order. It behaves like a plain list but also offersBatchResult.print_summary()andBatchResult.summary_df.
Example:
pipeline = Pipeline([ TriggerDetector(regex=r"\b1\b"), UpSample(factor=10), AASCorrection(window_size=30), DownSample(factor=10), SNRCalculator(), ]) results = pipeline.map( ["sub-01.edf", "sub-02.edf", "sub-03.edf"], keep_raw=False, ) results.print_summary()
- class facet.ProcessingContext(raw: Raw, raw_original: Raw | None = None, metadata: ProcessingMetadata | None = None)[source]
Bases:
objectProcessing context that wraps MNE Raw objects and provides metadata.
This class serves as the primary data container passed between processors. It provides: - Access to current and original MNE Raw objects - Metadata storage (triggers, parameters, etc.) - Processing history tracking - Estimated noise tracking - Immutable operations (processors return new contexts)
- _raw
Current processed MNE Raw object
- _raw_original
Original unprocessed MNE Raw object
- _metadata
Processing metadata
- _history
List of processing steps
- _estimated_noise
Accumulated noise estimates
- _cache
Cache for computed values
- __init__(raw: Raw, raw_original: Raw | None = None, metadata: ProcessingMetadata | None = None)[source]
Initialize processing context.
- Parameters:
raw – MNE Raw object
raw_original – Original Raw object (if None, copies raw)
metadata – Processing metadata (if None, creates empty)
- get_data(picks: Any | None = None, **kwargs) ndarray[source]
Get data from Raw object.
- Parameters:
picks – Channel picks (MNE format)
**kwargs – Additional arguments for Raw.get_data(copy=False)
- Returns:
Data array
- property metadata: ProcessingMetadata
Get processing metadata.
- get_metric(name: str, default=None)[source]
Return a single evaluation metric stored in the context.
Shortcut for the common pattern:
ctx.metadata.custom.get('metrics', {}).get(name, default)
Typically used inside a
ConditionalProcessorcondition function after an evaluation step has run.- Parameters:
name – Metric name (e.g.
'snr','rms_ratio').default – Value returned when the metric is absent.
Example:
def needs_extra_correction(ctx): return ctx.get_metric('snr', float('inf')) < 10
- get_history() list[ProcessingStep][source]
Get processing history.
- add_history_entry(name: str | None = None, processor_type: str = '', parameters: dict[str, Any] | None = None, *, processor_name: str | None = None) None[source]
Add entry to processing history.
- with_raw(raw: Raw, copy_metadata: bool = True) ProcessingContext[source]
Create new context with updated Raw object.
- Parameters:
raw – New Raw object
copy_metadata – Whether to copy metadata from current context
- Returns:
New ProcessingContext
- with_metadata(metadata: ProcessingMetadata) ProcessingContext[source]
Create new context with updated metadata.
- Parameters:
metadata – New metadata
- Returns:
New ProcessingContext
- with_triggers(triggers: ndarray) ProcessingContext[source]
Create new context with updated triggers.
- Parameters:
triggers – New trigger array
- Returns:
New ProcessingContext
- with_trigger_samples(triggers: ndarray | list[int], *, artifact_length: int | None = None, tr_seconds: float | None = None, trigger_regex: str | None = None, custom: dict[str, Any] | None = None, samples_are_absolute: bool = False) ProcessingContext[source]
Return a new context with trigger samples and optional artifact metadata.
- Parameters:
triggers (
typing.Iterableofint) – Trigger sample positions.artifact_length (
int, optional) – Artifact window length in samples. If omitted (andtr_secondsis also omitted), FACETpy infers it as the median spacing between trigger samples when at least two triggers are available.tr_seconds (
float, optional) – Artifact window length in seconds. Converted usingraw.info['sfreq'].trigger_regex (
str, optional) – Label/pattern stored in metadata for provenance.custom (
dict, optional) – Additionalmetadata.customentries to merge in.samples_are_absolute (
bool, optional) – IfTrue, trigger samples are interpreted as absolute MNE sample indices and converted to context-relative indices by subtractingraw.first_samp.
- with_mne_events(events: ndarray, *, event: int | str | None = None, event_id: dict[str, int] | None = None, artifact_length: int | None = None, tr_seconds: float | None = None, store_event_id: bool = True) ProcessingContext[source]
Build trigger metadata from MNE events and return a new context.
- Parameters:
events (
numpy.ndarray) – MNE-style event array with shape(n_events, 3).event (
intorstr, optional) – Event code or event name to select. IfNone, events must contain exactly one unique code.event_id (
dict, optional) – Mapping from event names to integer codes (frommne.events_from_annotations). Required wheneventis a str.artifact_length (
int, optional) – Artifact length in samples.tr_seconds (
float, optional) – Artifact length in seconds. Takes precedence over default inference, but cannot be combined withartifact_length.store_event_id (
bool, optional) – IfTrueandevent_idis provided, stores it inmetadata.custom['event_id'].
- copy(deep: bool = False) ProcessingContext[source]
Create a copy of the context.
- Parameters:
deep – If True, deep copy Raw object (expensive)
- Returns:
Copied ProcessingContext
- to_dict() dict[str, Any][source]
Serialize context to dictionary (for multiprocessing).
- Returns:
Dictionary representation
- classmethod from_dict(data: dict[str, Any]) ProcessingContext[source]
Deserialize context from dictionary.
- Parameters:
data – Dictionary representation
- Returns:
ProcessingContext instance
- __or__(other) ProcessingContext[source]
Apply a processor or callable to this context using the pipe operator.
Enables a clean chaining syntax outside of a
Pipeline:ctx = ProcessingContext(raw) result = ( ctx | HighPassFilter(1.0) | UpSample(10) | TriggerDetector(r"\b1\b") | AASCorrection() ) filtered_raw = result.get_raw()
- Parameters:
other – A
Processorinstance or anyCallable[[ProcessingContext], ProcessingContext].- Returns:
New
ProcessingContextproduced by applying other.
- class facet.BatchResult(results: list[PipelineResult], labels: list[str] | None = None)[source]
Bases:
objectResult of
Pipeline.map(...)- a list ofPipelineResultobjects with built-in helpers for quick inspection.It behaves like a regular list (iteration, indexing,
len), so existing code that iterates over the return value ofPipeline.map()continues to work without changes.Example:
results = pipeline.map(files, loader_factory=lambda p: Loader(p)) results.print_summary() df = results.summary_df
- print_summary() None[source]
Print a formatted table with one row per input file.
Columns include the file label, success/failure status, execution time, and any scalar metrics that were computed.
Example:
results = pipeline.map(files, loader_factory=...) results.print_summary()
- property summary_df
Return a
pandas.DataFramewith one row per input.Columns:
file,success,execution_time, plus one column per scalar metric. ReturnsNonewhen pandas is not installed.