Core API

The core module provides the fundamental building blocks of FACETpy’s architecture.

Base Classes

Processor

class facet.core.Processor[source]

Bases: ABC

Base class for all processors in the pipeline.

Processors are the fundamental building blocks of FACETpy pipelines. Each processor: - Takes a ProcessingContext as input - Performs a specific operation - Returns a new ProcessingContext (immutable by default) - Can validate prerequisites before processing - Tracks its execution in the context history

Subclasses must implement: - process(): Main processing logic - validate(): Check prerequisites (optional)

Example:

class HighPassFilter(Processor):
    name = "highpass_filter"

    def __init__(self, freq: float):
        self.freq = freq

    def validate(self, context: ProcessingContext) -> None:
        if not context.has_raw():
            raise ProcessorValidationError("No raw data available")

    def process(self, context: ProcessingContext) -> ProcessingContext:
        raw = context.get_raw().copy()
        raw.filter(l_freq=self.freq, h_freq=None)
        return context.with_raw(raw)
name: str = 'base_processor'
description: str = 'Base processor'
version: str = '1.0.0'
requires_triggers: bool = False
requires_raw: bool = True
modifies_raw: bool = True
parallel_safe: bool = True
channel_wise: bool = False
run_once: bool = False
__init__()[source]

Initialize processor.

__call__(context: ProcessingContext) ProcessingContext[source]

Make processor callable.

Parameters:

context – Input processing context

Returns:

Output processing context

execute(context: ProcessingContext) ProcessingContext[source]

Execute the processor with validation and history tracking.

Parameters:

context – Input processing context

Returns:

Output processing context

Raises:

ProcessorValidationError – If validation fails

validate(context: ProcessingContext) None[source]

Validate that prerequisites are met.

Override this method to add custom validation logic.

Parameters:

context – Processing context

Raises:

ProcessorValidationError – If validation fails

abstract process(context: ProcessingContext) ProcessingContext[source]

Process the context.

This is the main method to implement in subclasses.

Parameters:

context – Input processing context

Returns:

Output processing context. If None is returned, the input context is used (no-op behavior).

__repr__() str[source]

String representation.

ProcessingContext

class facet.core.ProcessingContext(raw: Raw, raw_original: Raw | None = None, metadata: ProcessingMetadata | None = None)[source]

Bases: object

Processing context that wraps MNE Raw objects and provides metadata.

This class serves as the primary data container passed between processors. It provides: - Access to current and original MNE Raw objects - Metadata storage (triggers, parameters, etc.) - Processing history tracking - Estimated noise tracking - Immutable operations (processors return new contexts)

_raw

Current processed MNE Raw object

_raw_original

Original unprocessed MNE Raw object

_metadata

Processing metadata

_history

List of processing steps

_estimated_noise

Accumulated noise estimates

_cache

Cache for computed values

__init__(raw: Raw, raw_original: Raw | None = None, metadata: ProcessingMetadata | None = None)[source]

Initialize processing context.

Parameters:
  • raw – MNE Raw object

  • raw_original – Original Raw object (if None, copies raw)

  • metadata – Processing metadata (if None, creates empty)

get_raw() Raw[source]

Get current processed Raw object.

get_raw_original() Raw[source]

Get original unprocessed Raw object.

has_raw() bool[source]

Check if Raw object exists.

get_data(picks: Any | None = None, **kwargs) ndarray[source]

Get data from Raw object.

Parameters:
  • picks – Channel picks (MNE format)

  • **kwargs – Additional arguments for Raw.get_data(copy=False)

Returns:

Data array

property metadata: ProcessingMetadata

Get processing metadata.

get_triggers() ndarray | None[source]

Get trigger positions.

has_triggers() bool[source]

Check if triggers exist.

get_artifact_length() int | None[source]

Get artifact length in samples.

get_sfreq() float[source]

Get sampling frequency.

get_n_channels() int[source]

Get number of channels.

get_channel_names() list[str][source]

Get channel names.

get_metric(name: str, default=None)[source]

Return a single evaluation metric stored in the context.

Shortcut for the common pattern:

ctx.metadata.custom.get('metrics', {}).get(name, default)

Typically used inside a ConditionalProcessor condition function after an evaluation step has run.

Parameters:
  • name – Metric name (e.g. 'snr', 'rms_ratio').

  • default – Value returned when the metric is absent.

Example:

def needs_extra_correction(ctx):
    return ctx.get_metric('snr', float('inf')) < 10
get_estimated_noise() ndarray | None[source]

Get accumulated noise estimates.

has_estimated_noise() bool[source]

Check if noise estimates exist.

set_estimated_noise(noise: ndarray) None[source]

Set estimated noise (mutable operation).

accumulate_noise(noise: ndarray) None[source]

Add to accumulated noise estimates.

get_history() list[ProcessingStep][source]

Get processing history.

add_history_entry(name: str | None = None, processor_type: str = '', parameters: dict[str, Any] | None = None, *, processor_name: str | None = None) None[source]

Add entry to processing history.

cache_get(key: str) Any | None[source]

Get value from cache.

cache_set(key: str, value: Any) None[source]

Set value in cache.

cache_has(key: str) bool[source]

Check if cache has key.

cache_clear() None[source]

Clear cache.

with_raw(raw: Raw, copy_metadata: bool = True) ProcessingContext[source]

Create new context with updated Raw object.

Parameters:
  • raw – New Raw object

  • copy_metadata – Whether to copy metadata from current context

Returns:

New ProcessingContext

with_metadata(metadata: ProcessingMetadata) ProcessingContext[source]

Create new context with updated metadata.

Parameters:

metadata – New metadata

Returns:

New ProcessingContext

with_triggers(triggers: ndarray) ProcessingContext[source]

Create new context with updated triggers.

Parameters:

triggers – New trigger array

Returns:

New ProcessingContext

with_trigger_samples(triggers: ndarray | list[int], *, artifact_length: int | None = None, tr_seconds: float | None = None, trigger_regex: str | None = None, custom: dict[str, Any] | None = None, samples_are_absolute: bool = False) ProcessingContext[source]

Return a new context with trigger samples and optional artifact metadata.

Parameters:
  • triggers (typing.Iterable of int) – Trigger sample positions.

  • artifact_length (int, optional) – Artifact window length in samples. If omitted (and tr_seconds is also omitted), FACETpy infers it as the median spacing between trigger samples when at least two triggers are available.

  • tr_seconds (float, optional) – Artifact window length in seconds. Converted using raw.info['sfreq'].

  • trigger_regex (str, optional) – Label/pattern stored in metadata for provenance.

  • custom (dict, optional) – Additional metadata.custom entries to merge in.

  • samples_are_absolute (bool, optional) – If True, trigger samples are interpreted as absolute MNE sample indices and converted to context-relative indices by subtracting raw.first_samp.

with_mne_events(events: ndarray, *, event: int | str | None = None, event_id: dict[str, int] | None = None, artifact_length: int | None = None, tr_seconds: float | None = None, store_event_id: bool = True) ProcessingContext[source]

Build trigger metadata from MNE events and return a new context.

Parameters:
  • events (numpy.ndarray) – MNE-style event array with shape (n_events, 3).

  • event (int or str, optional) – Event code or event name to select. If None, events must contain exactly one unique code.

  • event_id (dict, optional) – Mapping from event names to integer codes (from mne.events_from_annotations). Required when event is a str.

  • artifact_length (int, optional) – Artifact length in samples.

  • tr_seconds (float, optional) – Artifact length in seconds. Takes precedence over default inference, but cannot be combined with artifact_length.

  • store_event_id (bool, optional) – If True and event_id is provided, stores it in metadata.custom['event_id'].

copy(deep: bool = False) ProcessingContext[source]

Create a copy of the context.

Parameters:

deep – If True, deep copy Raw object (expensive)

Returns:

Copied ProcessingContext

to_dict() dict[str, Any][source]

Serialize context to dictionary (for multiprocessing).

Returns:

Dictionary representation

classmethod from_dict(data: dict[str, Any]) ProcessingContext[source]

Deserialize context from dictionary.

Parameters:

data – Dictionary representation

Returns:

ProcessingContext instance

__or__(other) ProcessingContext[source]

Apply a processor or callable to this context using the pipe operator.

Enables a clean chaining syntax outside of a Pipeline:

ctx = ProcessingContext(raw)
result = (
    ctx
    | HighPassFilter(1.0)
    | UpSample(10)
    | TriggerDetector(r"\b1\b")
    | AASCorrection()
)
filtered_raw = result.get_raw()
Parameters:

other – A Processor instance or any Callable[[ProcessingContext], ProcessingContext].

Returns:

New ProcessingContext produced by applying other.

__repr__() str[source]

String representation.

ProcessingStep

class facet.core.ProcessingStep(name: str, processor_type: str, parameters: dict[str, Any], timestamp: float)[source]

Bases: object

Record of a processing step.

name: str
processor_type: str
parameters: dict[str, Any]
timestamp: float
__init__(name: str, processor_type: str, parameters: dict[str, Any], timestamp: float) None

ProcessingMetadata

class facet.core.ProcessingMetadata(triggers: ~numpy.ndarray | None = None, trigger_regex: str | None = None, artifact_to_trigger_offset: float = 0.0, acq_start_sample: int | None = None, acq_end_sample: int | None = None, pre_trigger_samples: int | None = None, post_trigger_samples: int | None = None, upsampling_factor: int = 10, artifact_length: int | None = None, slices_per_volume: int | None = None, volume_gaps: bool = False, custom: dict[str, ~typing.Any] = <factory>)[source]

Bases: object

Metadata associated with processing context.

triggers: ndarray | None = None
trigger_regex: str | None = None
artifact_to_trigger_offset: float = 0.0
acq_start_sample: int | None = None
acq_end_sample: int | None = None
pre_trigger_samples: int | None = None
post_trigger_samples: int | None = None
upsampling_factor: int = 10
artifact_length: int | None = None
slices_per_volume: int | None = None
volume_gaps: bool = False
custom: dict[str, Any]
copy() ProcessingMetadata[source]

Create a deep copy of metadata.

to_dict() dict[str, Any][source]

Serialize metadata to dictionary.

classmethod from_dict(data: dict[str, Any]) ProcessingMetadata[source]

Deserialize metadata from dictionary.

__init__(triggers: ~numpy.ndarray | None = None, trigger_regex: str | None = None, artifact_to_trigger_offset: float = 0.0, acq_start_sample: int | None = None, acq_end_sample: int | None = None, pre_trigger_samples: int | None = None, post_trigger_samples: int | None = None, upsampling_factor: int = 10, artifact_length: int | None = None, slices_per_volume: int | None = None, volume_gaps: bool = False, custom: dict[str, ~typing.Any] = <factory>) None

Pipeline

Pipeline

class facet.core.Pipeline(processors: list[Processor | Callable], name: str | None = None)[source]

Bases: object

Pipeline for executing sequences of processors.

A pipeline orchestrates the execution of multiple processors in sequence, handles errors, provides progress tracking, and supports parallelization.

Example:

pipeline = Pipeline([
    Loader("data.edf"),
    HighPassFilter(freq=1.0),
    UpSample(factor=10),
    TriggerDetector(regex=r"\btrigger\b"),
    AASCorrection(),
    EDFExporter("output.edf")
])

result = pipeline.run()
if result.was_successful():
    print(f"Completed in {result.execution_time:.2f}s")
processors

List of processors to execute

name

Optional pipeline name

__init__(processors: list[Processor | Callable], name: str | None = None)[source]

Initialize pipeline.

Plain callables (Callable[[ProcessingContext], ProcessingContext]) are automatically wrapped in a LambdaProcessor so they can be used as inline steps without ceremony:

pipeline = Pipeline([
    Loader("data.edf"),
    HighPassFilter(1.0),
    lambda ctx: (print(ctx.get_sfreq()) or ctx),
    AASCorrection(),
])
Parameters:
  • processors – List of Processor instances or plain callables to execute in order.

  • name – Optional pipeline name (for logging)

run(initial_context: ProcessingContext | None = None, parallel: bool = False, n_jobs: int = -1, channel_sequential: bool = False, show_progress: bool = True) PipelineResult[source]

Execute the pipeline.

Parameters:
  • initial_context – Initial context (if None, first processor creates it)

  • parallel – Enable parallel execution for compatible processors

  • n_jobs – Number of parallel jobs (-1 for all CPUs)

  • channel_sequential

    Run consecutive channel-wise processors (channel_wise = True) as a single per-channel pass. For each channel the full local sequence executes before the next channel starts:

    for each channel:
        channel → HP-filter → UpSample → AAS → DownSample → store
    

    The output array is pre-allocated at the final sfreq so the full high-sfreq intermediate data never exists for all channels at once.

    Processors with run_once = True (e.g. TriggerAligner) are included in the per-channel pass but only execute for the first channel; all subsequent channels skip them.

    This flag is independent of parallel_safe and has no relation to multiprocessing. Takes precedence over parallel for eligible processors.

  • show_progress – Show progress bar

Returns:

PipelineResult containing final context and metadata

add(processor: Processor | Callable) Pipeline[source]

Add a processor or callable to the pipeline (fluent API).

Parameters:

processorProcessor instance or callable.

Returns:

Self for chaining

extend(processors: list[Processor | Callable]) Pipeline[source]

Extend pipeline with multiple processors or callables.

Parameters:

processors – List of processors or callables to add.

Returns:

Self for chaining

insert(index: int, processor: Processor | Callable) Pipeline[source]

Insert a processor or callable at a specific position.

Parameters:
  • index – Position to insert

  • processorProcessor instance or callable.

Returns:

Self for chaining

remove(index: int) Pipeline[source]

Remove processor at index.

Parameters:

index – Index to remove

Returns:

Self for chaining

validate_all(context: ProcessingContext) list[str][source]

Validate all processors against a context.

Useful for checking if a pipeline can run before actually running it.

Parameters:

context – Context to validate against

Returns:

List of validation error messages (empty if all valid)

describe() str[source]

Get human-readable pipeline description.

Returns:

Multi-line string describing pipeline

to_dict() dict[str, Any][source]

Serialize pipeline to dictionary.

Returns:

Dictionary representation

map(inputs: list[str | ProcessingContext], loader_factory: Callable[[str], Processor] | None = None, parallel: bool = False, n_jobs: int = -1, on_error: str = 'continue', keep_raw: bool = True) BatchResult[source]

Run the pipeline on multiple inputs and return a result per input.

Each input can be:

  • A ProcessingContext — passed directly as initial_context.

  • A file path string — a fresh Loader is created automatically for each path via loader_factory.

Note

Do not add a Loader processor to the pipeline when using map(). Loading is handled outside the pipeline so that each file gets its own isolated loader instance.

Parameters:
  • inputs – List of file paths or ProcessingContext objects.

  • loader_factoryCallable[[path], Processor] that creates a fresh loader for each path string. Defaults to lambda p: Loader(path=p, preload=True).

  • parallel – Whether to pass parallel=True to each pipeline.run().

  • n_jobs – Passed through to pipeline.run().

  • on_error"continue" (default) — log failures and keep going; "raise" — re-raise the first error encountered.

  • keep_raw – If False, the Raw data is released from each result after the pipeline run completes, keeping only metrics and history in memory. Set to False when processing many files and you only need summary statistics. Defaults to True.

Returns:

BatchResult containing one PipelineResult per input, in the same order. It behaves like a plain list but also offers BatchResult.print_summary() and BatchResult.summary_df.

Example:

pipeline = Pipeline([
    TriggerDetector(regex=r"\b1\b"),
    UpSample(factor=10),
    AASCorrection(window_size=30),
    DownSample(factor=10),
    SNRCalculator(),
])

results = pipeline.map(
    ["sub-01.edf", "sub-02.edf", "sub-03.edf"],
    keep_raw=False,
)
results.print_summary()
__len__() int[source]

Get number of processors.

__getitem__(index: int) Processor[source]

Get processor by index.

__repr__() str[source]

String representation.

PipelineResult

class facet.core.PipelineResult(context: ProcessingContext | None, success: bool = True, error: Exception | None = None, execution_time: float = 0.0, failed_processor: str | None = None, failed_processor_index: int | None = None)[source]

Bases: object

Result of pipeline execution.

Contains the final context and metadata about execution.

__init__(context: ProcessingContext | None, success: bool = True, error: Exception | None = None, execution_time: float = 0.0, failed_processor: str | None = None, failed_processor_index: int | None = None)[source]

Initialize pipeline result.

Parameters:
  • context – Final processing context

  • success – Whether pipeline completed successfully

  • error – Exception if pipeline failed

  • execution_time – Total execution time in seconds

get_context() ProcessingContext[source]

Get final processing context.

get_raw()[source]

Get final raw data (convenience method).

get_history()[source]

Get processing history.

was_successful() bool[source]

Check if pipeline succeeded.

property metrics: dict[str, Any]

Shortcut to evaluation metrics stored in context.

Returns the metrics dict from context.metadata.custom, or an empty dict if no metrics have been calculated yet.

Example:

result = pipeline.run()
print(result.metrics['snr'])
property metrics_df

Return scalar evaluation metrics as a pandas.Series.

Nested dicts (e.g. fft_allen) are flattened with _ separators. Returns None if pandas is not available.

Example:

result = pipeline.run()
print(result.metrics_df)
metric(name: str, default=None)[source]

Return a single evaluation metric by name.

Shortcut for result.metrics.get(name, default) that avoids having to remember the dict key and provides a clean default.

Parameters:
  • name – Metric name (e.g. 'snr', 'rms_ratio').

  • default – Value returned when the metric is absent.

Example:

snr = result.metric('snr')
print(f"SNR = {snr:.2f} dB")
print_metrics() None[source]

Print a formatted table of all evaluation metrics.

Uses rich for colour and alignment when available.

Example:

result = pipeline.run()
result.print_metrics()
print_summary() None[source]

Print a one-line summary of the pipeline result.

Shows success/failure, execution time, and any key metrics (SNR, RMS ratio, RMS residual) that were calculated.

Example:

result = pipeline.run()
result.print_summary()
plot(**kwargs)[source]

Plot the corrected data using RawPlotter defaults.

Accepts any keyword arguments supported by RawPlotter.

Example:

result = pipeline.run()
result.plot(channel="Fp1", start=5.0, duration=10.0)
release_raw() None[source]

Release the Raw data held by the context to free memory.

After calling this, get_raw() and plot() will no longer work, but metrics and execution_time remain accessible. Useful when running batch jobs where you only need summary statistics.

BatchResult

class facet.core.BatchResult(results: list[PipelineResult], labels: list[str] | None = None)[source]

Bases: object

Result of Pipeline.map(...) - a list of PipelineResult objects with built-in helpers for quick inspection.

It behaves like a regular list (iteration, indexing, len), so existing code that iterates over the return value of Pipeline.map() continues to work without changes.

Example:

results = pipeline.map(files, loader_factory=lambda p: Loader(p))
results.print_summary()
df = results.summary_df
__init__(results: list[PipelineResult], labels: list[str] | None = None)[source]
print_summary() None[source]

Print a formatted table with one row per input file.

Columns include the file label, success/failure status, execution time, and any scalar metrics that were computed.

Example:

results = pipeline.map(files, loader_factory=...)
results.print_summary()
property summary_df

Return a pandas.DataFrame with one row per input.

Columns: file, success, execution_time, plus one column per scalar metric. Returns None when pandas is not installed.

PipelineBuilder

class facet.core.PipelineBuilder(name: str | None = None)[source]

Bases: object

Fluent builder for constructing pipelines.

Example:

pipeline = (PipelineBuilder()
    .add(Loader("data.edf"))
    .highpass(1.0)
    .upsample(10)
    .detect_triggers(r"\btrigger\b")
    .aas_correction()
    .export_edf("output.edf")
    .build())
__init__(name: str | None = None)[source]

Initialize builder.

Parameters:

name – Optional pipeline name

add(processor: Processor) PipelineBuilder[source]

Add custom processor.

Parameters:

processor – Processor to add

Returns:

Self for chaining

add_if(condition: bool, processor: Processor) PipelineBuilder[source]

Add processor conditionally.

Parameters:
  • condition – Whether to add processor

  • processor – Processor to add

Returns:

Self for chaining

build() Pipeline[source]

Build the pipeline.

Returns:

Constructed Pipeline instance

Composite Processors

SequenceProcessor

class facet.core.SequenceProcessor(processors: list[Processor])[source]

Bases: Processor

Processor that executes multiple processors in sequence.

This is a composite processor that allows grouping multiple processing steps into a single unit.

Example:

preprocessing = SequenceProcessor([
    HighPassFilter(freq=1.0),
    UpSample(factor=10),
    NotchFilter(freqs=[50])
])
name: str = 'sequence'
requires_raw: bool = False
__init__(processors: list[Processor])[source]

Initialize sequence processor.

Parameters:

processors – List of processors to execute in order

validate(context: ProcessingContext) None[source]

Validate all child processors.

process(context: ProcessingContext) ProcessingContext[source]

Execute all processors in sequence.

ConditionalProcessor

class facet.core.ConditionalProcessor(condition: callable, processor: Processor, else_processor: Processor | None = None)[source]

Bases: Processor

Processor that executes conditionally based on context.

Example:

ConditionalProcessor(
    condition=lambda ctx: ctx.metadata.custom.get("needs_upsampling"),
    processor=UpSample(factor=10),
    else_processor=None  # Skip if condition is False
)
name: str = 'conditional'
requires_raw: bool = False
__init__(condition: callable, processor: Processor, else_processor: Processor | None = None)[source]

Initialize conditional processor.

Parameters:
  • condition – Callable that takes context and returns bool

  • processor – Processor to execute if condition is True

  • else_processor – Processor to execute if condition is False (optional)

validate(context: ProcessingContext) None[source]

Validate based on condition.

process(context: ProcessingContext) ProcessingContext[source]

Execute conditionally.

SwitchProcessor

class facet.core.SwitchProcessor(selector: callable, cases: dict[str, Processor], default: Processor | None = None)[source]

Bases: Processor

Processor that switches between multiple processors based on selector.

Example:

SwitchProcessor(
    selector=lambda ctx: "motion" if ctx.has_motion_data() else "aas",
    cases={
        "aas": AASCorrection(),
        "motion": MotionBasedCorrection()
    },
    default=AASCorrection()
)
name: str = 'switch'
requires_raw: bool = False
__init__(selector: callable, cases: dict[str, Processor], default: Processor | None = None)[source]

Initialize switch processor.

Parameters:
  • selector – Callable that takes context and returns case key

  • cases – Dictionary mapping case keys to processors

  • default – Default processor if selector returns unknown key

validate(context: ProcessingContext) None[source]

Validate selected processor.

process(context: ProcessingContext) ProcessingContext[source]

Execute selected processor.

NoOpProcessor

class facet.core.NoOpProcessor[source]

Bases: Processor

Processor that does nothing (useful for testing or placeholders).

name: str = 'noop'
requires_raw: bool = False
modifies_raw: bool = False
process(context: ProcessingContext) ProcessingContext[source]

Return context unchanged.

LambdaProcessor

class facet.core.LambdaProcessor(name: str, func: callable)[source]

Bases: Processor

Processor that executes a lambda function.

Useful for quick custom operations without creating a full processor class.

Example:

LambdaProcessor(
    name="remove_bad_channels",
    func=lambda ctx: ctx.with_raw(
        ctx.get_raw().copy().drop_channels(["EKG"])
    )
)
requires_raw: bool = False
__init__(name: str, func: callable)[source]

Initialize lambda processor.

Parameters:
  • name – Processor name

  • func – Function that takes context and returns new context

process(context: ProcessingContext) ProcessingContext[source]

Execute lambda function.

Registry

facet.core.register_processor(processor_class: type[Processor] | None = None, name: str | None = None, force: bool = False)[source]

Decorator to register a processor.

Can be used with or without arguments.

Example:

@register_processor
class MyProcessor(Processor):
    name = "my_processor"
    ...

@register_processor(name="custom_name")
class MyProcessor(Processor):
    ...
Parameters:
  • processor_class – Processor class (when used without arguments)

  • name – Custom name (overrides class.name)

  • force – Force registration even if name exists

Returns:

Decorator function or processor class

facet.core.get_processor(name: str) type[Processor][source]

Get processor class by name (convenience function).

Parameters:

name – Processor name

Returns:

Processor class

facet.core.list_processors(category: str | None = None) dict[str, type[Processor]][source]

List all registered processors (convenience function).

Parameters:

category – Optional category filter

Returns:

Dictionary of processors

Parallel Execution

ParallelExecutor

class facet.core.ParallelExecutor(n_jobs: int = -1, backend: str = 'multiprocessing', verbose: bool = True)[source]

Bases: object

Executor for parallel processing of channels or epochs.

This class handles multiprocessing for processors that support it, typically for channel-wise or epoch-wise operations.

Example

executor = ParallelExecutor(n_jobs=4) result_context = executor.execute(processor, context)

__init__(n_jobs: int = -1, backend: str = 'multiprocessing', verbose: bool = True)[source]

Initialize parallel executor.

Parameters:
  • n_jobs – Number of parallel jobs (-1 for all CPUs, -2 for all but one)

  • backend – Parallel backend (“multiprocessing”, “threading”, or “serial”)

  • verbose – Show progress messages

execute(processor: Processor, context: ProcessingContext) ProcessingContext[source]

Execute processor in parallel if possible.

This method attempts to parallelize the processor execution. If parallelization is not applicable, falls back to serial execution.

Parameters:
  • processor – Processor to execute

  • context – Input context

Returns:

Output context

ChannelSequentialExecutor

class facet.core.ChannelSequentialExecutor[source]

Bases: object

Execute a sequence of processors one channel at a time.

For each data channel the full processor sequence runs to completion before the next channel starts. This ensures that large intermediate representations (e.g. 10x upsampled data) only ever exist for a single channel simultaneously.

Processors with run_once = True execute only for the first channel; all subsequent channels skip them and inherit the metadata produced by that first run.

Non-data channels (stim, misc, …) are handled separately: copied unchanged when the sampling rate stays the same, or resampled via MNE when the batch changes the sampling rate.

The live console (when active) receives fine-grained channel and processor progress updates via the modern console manager.

Example:

executor = ChannelSequentialExecutor()
result = executor.execute(
    [HighPassFilter(1.0), UpSample(10), AASCorrection(), DownSample(10)],
    context,
)
execute(processors: list[Processor], context: ProcessingContext) ProcessingContext[source]

Run processors on context one channel at a time.

Parameters:
  • processors (list of Processor) – Processors to execute in order for every channel.

  • context (ProcessingContext) – Input context containing the full multi-channel dataset.

Returns:

Merged output context with all channels processed.

Return type:

ProcessingContext

Exceptions

class facet.core.ProcessorError[source]

Bases: Exception

Base exception for all processor-related errors.

class facet.core.ProcessorValidationError[source]

Bases: ProcessorError

Raised when processor validation fails.

class facet.core.PipelineError[source]

Bases: Exception

Base exception for pipeline-related errors.