Top-Level API

The facet package re-exports the most commonly used classes and helper functions.

Convenience Functions

facet.load(path: str, **kwargs) ProcessingContext[source]

Load an EEG file and return a ProcessingContext.

Automatically detects the file format from the extension (EDF, BDF, GDF, BrainVision, EEGLAB, FIF). For BIDS datasets, use BIDSLoader.

Parameters:
  • path (str) – Path to the EEG data file.

  • **kwargs – Additional keyword arguments forwarded to Loader (e.g. preload, artifact_to_trigger_offset, bad_channels).

Returns:

Context containing the loaded recording.

Return type:

facet.core.ProcessingContext

Examples

>>> import facet
>>> ctx = facet.load("./data/subject01.edf", preload=True)
>>> ctx = ctx | facet.HighPassFilter(1.0) | facet.TriggerDetector(r"\b1\b")
facet.export(context: ProcessingContext, path: str, **kwargs) ProcessingContext[source]

Export a ProcessingContext using extension-based auto routing.

Uses Exporter to dispatch export handling from path extension (for example .edf, .set, .vhdr).

Parameters:
  • context (facet.core.ProcessingContext) – Input context that contains the Raw data to export.

  • path (str) – Destination path; extension selects the exporter implementation.

  • **kwargs – Additional keyword arguments forwarded to Exporter (for example overwrite).

Returns:

The unchanged input context after export.

Return type:

facet.core.ProcessingContext

Examples

>>> import facet
>>> ctx = facet.load("./data/subject01.edf", preload=True)
>>> ctx = facet.export(ctx, "./data/subject01_corrected.set", overwrite=True)
facet.create_standard_pipeline(input_path: str, output_path: str, start_sample: int = 0, end_sample: int = None, trigger_regex: str = '\\b1\\b', artifact_to_trigger_offset: float = -0.005, upsample_factor: int = 10, use_anc: bool = True, use_pca: bool = True, evaluate: bool = False, plot: bool = False, plot_kwargs: dict | None = None) Pipeline[source]

Create a standard fMRI artifact correction pipeline.

Parameters:
  • input_path – Path to input EEG file

  • output_path – Path to output corrected file

  • trigger_regex – Regex pattern for trigger detection

  • artifact_to_trigger_offset – Time offset between artifact and trigger (seconds)

  • upsample_factor – Upsampling factor for alignment

  • use_anc – Whether to apply ANC correction

  • use_pca – Whether to apply PCA correction

  • evaluate – Append all standard evaluation metrics (SNR, RMS, Median, FFT) and a MetricsReport step automatically.

  • plot – Append a RawPlotter step after evaluation. Implies evaluate=True.

  • plot_kwargs – Extra keyword arguments forwarded to RawPlotter.

Returns:

Configured Pipeline instance

Example:

# Minimal — just correction + export
pipeline = create_standard_pipeline("data.edf", "corrected.edf")
result = pipeline.run()

# With automatic evaluation
pipeline = create_standard_pipeline(
    "data.edf",
    "corrected.edf",
    evaluate=True,
)
result = pipeline.run()
print(result.metrics)

# With evaluation and interactive plot
pipeline = create_standard_pipeline(
    "data.edf",
    "corrected.edf",
    evaluate=True,
    plot=True,
    plot_kwargs={"channel": "Fp1", "start": 5.0, "duration": 10.0},
)
result = pipeline.run()

Configuration

facet.get_config(key: str | None = None) Any[source]

Return the resolved FACETpy configuration.

Parameters:

key (str, optional) – Optional key to retrieve. If None, the full config dict is returned.

facet.set_config(config: Mapping[str, Any] | None = None, /, *, apply_logging: bool = True, **kwargs) dict[str, Any][source]

Set in-process runtime config overrides (highest precedence).

Parameters:
  • config (mapping, optional) – Optional mapping with config keys/values.

  • apply_logging (bool) – Reconfigure FACETpy logging immediately after applying overrides.

  • **kwargs – Additional key/value pairs (merged with config).

facet.reset_config(*, apply_logging: bool = True) dict[str, Any][source]

Clear runtime overrides and return the resolved config.

Commonly Used Re-Exports

Core

class facet.Pipeline(processors: list[Processor | Callable], name: str | None = None)[source]

Bases: object

Pipeline for executing sequences of processors.

A pipeline orchestrates the execution of multiple processors in sequence, handles errors, provides progress tracking, and supports parallelization.

Example:

pipeline = Pipeline([
    Loader("data.edf"),
    HighPassFilter(freq=1.0),
    UpSample(factor=10),
    TriggerDetector(regex=r"\btrigger\b"),
    AASCorrection(),
    EDFExporter("output.edf")
])

result = pipeline.run()
if result.was_successful():
    print(f"Completed in {result.execution_time:.2f}s")
processors

List of processors to execute

name

Optional pipeline name

__init__(processors: list[Processor | Callable], name: str | None = None)[source]

Initialize pipeline.

Plain callables (Callable[[ProcessingContext], ProcessingContext]) are automatically wrapped in a LambdaProcessor so they can be used as inline steps without ceremony:

pipeline = Pipeline([
    Loader("data.edf"),
    HighPassFilter(1.0),
    lambda ctx: (print(ctx.get_sfreq()) or ctx),
    AASCorrection(),
])
Parameters:
  • processors – List of Processor instances or plain callables to execute in order.

  • name – Optional pipeline name (for logging)

run(initial_context: ProcessingContext | None = None, parallel: bool = False, n_jobs: int = -1, channel_sequential: bool = False, show_progress: bool = True) PipelineResult[source]

Execute the pipeline.

Parameters:
  • initial_context – Initial context (if None, first processor creates it)

  • parallel – Enable parallel execution for compatible processors

  • n_jobs – Number of parallel jobs (-1 for all CPUs)

  • channel_sequential

    Run consecutive channel-wise processors (channel_wise = True) as a single per-channel pass. For each channel the full local sequence executes before the next channel starts:

    for each channel:
        channel → HP-filter → UpSample → AAS → DownSample → store
    

    The output array is pre-allocated at the final sfreq so the full high-sfreq intermediate data never exists for all channels at once.

    Processors with run_once = True (e.g. TriggerAligner) are included in the per-channel pass but only execute for the first channel; all subsequent channels skip them.

    This flag is independent of parallel_safe and has no relation to multiprocessing. Takes precedence over parallel for eligible processors.

  • show_progress – Show progress bar

Returns:

PipelineResult containing final context and metadata

add(processor: Processor | Callable) Pipeline[source]

Add a processor or callable to the pipeline (fluent API).

Parameters:

processorProcessor instance or callable.

Returns:

Self for chaining

extend(processors: list[Processor | Callable]) Pipeline[source]

Extend pipeline with multiple processors or callables.

Parameters:

processors – List of processors or callables to add.

Returns:

Self for chaining

insert(index: int, processor: Processor | Callable) Pipeline[source]

Insert a processor or callable at a specific position.

Parameters:
  • index – Position to insert

  • processorProcessor instance or callable.

Returns:

Self for chaining

remove(index: int) Pipeline[source]

Remove processor at index.

Parameters:

index – Index to remove

Returns:

Self for chaining

validate_all(context: ProcessingContext) list[str][source]

Validate all processors against a context.

Useful for checking if a pipeline can run before actually running it.

Parameters:

context – Context to validate against

Returns:

List of validation error messages (empty if all valid)

describe() str[source]

Get human-readable pipeline description.

Returns:

Multi-line string describing pipeline

to_dict() dict[str, Any][source]

Serialize pipeline to dictionary.

Returns:

Dictionary representation

map(inputs: list[str | ProcessingContext], loader_factory: Callable[[str], Processor] | None = None, parallel: bool = False, n_jobs: int = -1, on_error: str = 'continue', keep_raw: bool = True) BatchResult[source]

Run the pipeline on multiple inputs and return a result per input.

Each input can be:

  • A ProcessingContext — passed directly as initial_context.

  • A file path string — a fresh Loader is created automatically for each path via loader_factory.

Note

Do not add a Loader processor to the pipeline when using map(). Loading is handled outside the pipeline so that each file gets its own isolated loader instance.

Parameters:
  • inputs – List of file paths or ProcessingContext objects.

  • loader_factoryCallable[[path], Processor] that creates a fresh loader for each path string. Defaults to lambda p: Loader(path=p, preload=True).

  • parallel – Whether to pass parallel=True to each pipeline.run().

  • n_jobs – Passed through to pipeline.run().

  • on_error"continue" (default) — log failures and keep going; "raise" — re-raise the first error encountered.

  • keep_raw – If False, the Raw data is released from each result after the pipeline run completes, keeping only metrics and history in memory. Set to False when processing many files and you only need summary statistics. Defaults to True.

Returns:

BatchResult containing one PipelineResult per input, in the same order. It behaves like a plain list but also offers BatchResult.print_summary() and BatchResult.summary_df.

Example:

pipeline = Pipeline([
    TriggerDetector(regex=r"\b1\b"),
    UpSample(factor=10),
    AASCorrection(window_size=30),
    DownSample(factor=10),
    SNRCalculator(),
])

results = pipeline.map(
    ["sub-01.edf", "sub-02.edf", "sub-03.edf"],
    keep_raw=False,
)
results.print_summary()
__len__() int[source]

Get number of processors.

__getitem__(index: int) Processor[source]

Get processor by index.

__repr__() str[source]

String representation.

class facet.ProcessingContext(raw: Raw, raw_original: Raw | None = None, metadata: ProcessingMetadata | None = None)[source]

Bases: object

Processing context that wraps MNE Raw objects and provides metadata.

This class serves as the primary data container passed between processors. It provides: - Access to current and original MNE Raw objects - Metadata storage (triggers, parameters, etc.) - Processing history tracking - Estimated noise tracking - Immutable operations (processors return new contexts)

_raw

Current processed MNE Raw object

_raw_original

Original unprocessed MNE Raw object

_metadata

Processing metadata

_history

List of processing steps

_estimated_noise

Accumulated noise estimates

_cache

Cache for computed values

__init__(raw: Raw, raw_original: Raw | None = None, metadata: ProcessingMetadata | None = None)[source]

Initialize processing context.

Parameters:
  • raw – MNE Raw object

  • raw_original – Original Raw object (if None, copies raw)

  • metadata – Processing metadata (if None, creates empty)

get_raw() Raw[source]

Get current processed Raw object.

get_raw_original() Raw[source]

Get original unprocessed Raw object.

has_raw() bool[source]

Check if Raw object exists.

get_data(picks: Any | None = None, **kwargs) ndarray[source]

Get data from Raw object.

Parameters:
  • picks – Channel picks (MNE format)

  • **kwargs – Additional arguments for Raw.get_data(copy=False)

Returns:

Data array

property metadata: ProcessingMetadata

Get processing metadata.

get_triggers() ndarray | None[source]

Get trigger positions.

has_triggers() bool[source]

Check if triggers exist.

get_artifact_length() int | None[source]

Get artifact length in samples.

get_sfreq() float[source]

Get sampling frequency.

get_n_channels() int[source]

Get number of channels.

get_channel_names() list[str][source]

Get channel names.

get_metric(name: str, default=None)[source]

Return a single evaluation metric stored in the context.

Shortcut for the common pattern:

ctx.metadata.custom.get('metrics', {}).get(name, default)

Typically used inside a ConditionalProcessor condition function after an evaluation step has run.

Parameters:
  • name – Metric name (e.g. 'snr', 'rms_ratio').

  • default – Value returned when the metric is absent.

Example:

def needs_extra_correction(ctx):
    return ctx.get_metric('snr', float('inf')) < 10
get_estimated_noise() ndarray | None[source]

Get accumulated noise estimates.

has_estimated_noise() bool[source]

Check if noise estimates exist.

set_estimated_noise(noise: ndarray) None[source]

Set estimated noise (mutable operation).

accumulate_noise(noise: ndarray) None[source]

Add to accumulated noise estimates.

get_history() list[ProcessingStep][source]

Get processing history.

add_history_entry(name: str | None = None, processor_type: str = '', parameters: dict[str, Any] | None = None, *, processor_name: str | None = None) None[source]

Add entry to processing history.

cache_get(key: str) Any | None[source]

Get value from cache.

cache_set(key: str, value: Any) None[source]

Set value in cache.

cache_has(key: str) bool[source]

Check if cache has key.

cache_clear() None[source]

Clear cache.

with_raw(raw: Raw, copy_metadata: bool = True) ProcessingContext[source]

Create new context with updated Raw object.

Parameters:
  • raw – New Raw object

  • copy_metadata – Whether to copy metadata from current context

Returns:

New ProcessingContext

with_metadata(metadata: ProcessingMetadata) ProcessingContext[source]

Create new context with updated metadata.

Parameters:

metadata – New metadata

Returns:

New ProcessingContext

with_triggers(triggers: ndarray) ProcessingContext[source]

Create new context with updated triggers.

Parameters:

triggers – New trigger array

Returns:

New ProcessingContext

with_trigger_samples(triggers: ndarray | list[int], *, artifact_length: int | None = None, tr_seconds: float | None = None, trigger_regex: str | None = None, custom: dict[str, Any] | None = None, samples_are_absolute: bool = False) ProcessingContext[source]

Return a new context with trigger samples and optional artifact metadata.

Parameters:
  • triggers (typing.Iterable of int) – Trigger sample positions.

  • artifact_length (int, optional) – Artifact window length in samples. If omitted (and tr_seconds is also omitted), FACETpy infers it as the median spacing between trigger samples when at least two triggers are available.

  • tr_seconds (float, optional) – Artifact window length in seconds. Converted using raw.info['sfreq'].

  • trigger_regex (str, optional) – Label/pattern stored in metadata for provenance.

  • custom (dict, optional) – Additional metadata.custom entries to merge in.

  • samples_are_absolute (bool, optional) – If True, trigger samples are interpreted as absolute MNE sample indices and converted to context-relative indices by subtracting raw.first_samp.

with_mne_events(events: ndarray, *, event: int | str | None = None, event_id: dict[str, int] | None = None, artifact_length: int | None = None, tr_seconds: float | None = None, store_event_id: bool = True) ProcessingContext[source]

Build trigger metadata from MNE events and return a new context.

Parameters:
  • events (numpy.ndarray) – MNE-style event array with shape (n_events, 3).

  • event (int or str, optional) – Event code or event name to select. If None, events must contain exactly one unique code.

  • event_id (dict, optional) – Mapping from event names to integer codes (from mne.events_from_annotations). Required when event is a str.

  • artifact_length (int, optional) – Artifact length in samples.

  • tr_seconds (float, optional) – Artifact length in seconds. Takes precedence over default inference, but cannot be combined with artifact_length.

  • store_event_id (bool, optional) – If True and event_id is provided, stores it in metadata.custom['event_id'].

copy(deep: bool = False) ProcessingContext[source]

Create a copy of the context.

Parameters:

deep – If True, deep copy Raw object (expensive)

Returns:

Copied ProcessingContext

to_dict() dict[str, Any][source]

Serialize context to dictionary (for multiprocessing).

Returns:

Dictionary representation

classmethod from_dict(data: dict[str, Any]) ProcessingContext[source]

Deserialize context from dictionary.

Parameters:

data – Dictionary representation

Returns:

ProcessingContext instance

__or__(other) ProcessingContext[source]

Apply a processor or callable to this context using the pipe operator.

Enables a clean chaining syntax outside of a Pipeline:

ctx = ProcessingContext(raw)
result = (
    ctx
    | HighPassFilter(1.0)
    | UpSample(10)
    | TriggerDetector(r"\b1\b")
    | AASCorrection()
)
filtered_raw = result.get_raw()
Parameters:

other – A Processor instance or any Callable[[ProcessingContext], ProcessingContext].

Returns:

New ProcessingContext produced by applying other.

__repr__() str[source]

String representation.

class facet.BatchResult(results: list[PipelineResult], labels: list[str] | None = None)[source]

Bases: object

Result of Pipeline.map(...) - a list of PipelineResult objects with built-in helpers for quick inspection.

It behaves like a regular list (iteration, indexing, len), so existing code that iterates over the return value of Pipeline.map() continues to work without changes.

Example:

results = pipeline.map(files, loader_factory=lambda p: Loader(p))
results.print_summary()
df = results.summary_df
__init__(results: list[PipelineResult], labels: list[str] | None = None)[source]
print_summary() None[source]

Print a formatted table with one row per input file.

Columns include the file label, success/failure status, execution time, and any scalar metrics that were computed.

Example:

results = pipeline.map(files, loader_factory=...)
results.print_summary()
property summary_df

Return a pandas.DataFrame with one row per input.

Columns: file, success, execution_time, plus one column per scalar metric. Returns None when pandas is not installed.

Errors

class facet.ProcessorError[source]

Bases: Exception

Base exception for all processor-related errors.

class facet.PipelineError[source]

Bases: Exception

Base exception for pipeline-related errors.