Quick Start

This guide will get you up and running with FACETpy in 5 minutes.

Before running the quickstart pipeline, install FACETpy from PyPI:

pip install facetpy

Strongly recommended (for fast ANC, same command on all platforms):

python -m facet.build

To run repository examples, clone the repository and keep using pip install facetpy in your active environment (no uv required). For contribution workflows, use the source/uv setup in Contributing to FACETpy.

Basic Correction Pipeline

The simplest way to correct fMRI artifacts:

from facet import create_standard_pipeline

# Create pipeline
pipeline = create_standard_pipeline(
    input_path="my_data.edf",
    output_path="corrected.edf",
    trigger_regex=r"\b1\b",  # Pattern to match triggers
    evaluate=True
)

# Run correction (recommended default for memory efficiency)
result = pipeline.run(channel_sequential=True)

# Check results
if result.success:
    print(f"✓ Correction completed in {result.execution_time:.2f}s")
    metrics = result.context.metadata.custom['metrics']
    print(f"  SNR: {metrics['snr']:.2f}")
    print(f"  RMS improvement: {metrics['rms_ratio']:.2f}x")
else:
    print(f"✗ Failed: {result.error}")

That’s it! Your corrected data is saved to corrected.edf.

Understanding the Standard Pipeline

The standard pipeline performs these steps:

  1. Load EDF file

  2. Detect triggers using regex pattern

  3. Cut acquisition window around trigger periods

  4. High-pass filter (1 Hz)

  5. Upsample to 10x for precise alignment

  6. Align triggers (slice + subsample alignment)

  7. Correct artifacts with AAS (Averaged Artifact Subtraction)

  8. Optional PCA correction (enabled by default if available)

  9. Downsample back to original sampling rate

  10. Restore previously cut acquisition windows

  11. Low-pass filter (70 Hz)

  12. Optional ANC correction (enabled by default if available)

  13. Export corrected data

Custom Pipeline

For more control, build a custom pipeline:

from facet.core import Pipeline
from facet.io import Loader, EDFExporter
from facet.preprocessing import TriggerDetector, UpSample, DownSample
from facet.correction import AASCorrection
from facet.evaluation import SNRCalculator, MetricsReport

pipeline = Pipeline([
    # Load your data
    Loader(path="data.edf", preload=True),

    # Detect fMRI triggers
    TriggerDetector(regex=r"\b1\b"),

    # Upsample for precision
    UpSample(factor=10),

    # Main artifact correction
    AASCorrection(
        window_size=30,              # Size of sliding window
        correlation_threshold=0.975  # Correlation threshold
    ),

    # Downsample back
    DownSample(factor=10),

    # Evaluate correction quality
    SNRCalculator(),
    MetricsReport(),

    # Save corrected data
    EDFExporter(path="corrected.edf", overwrite=True)
], name="My fMRI Correction Pipeline")

# Run it (recommended default for memory efficiency)
result = pipeline.run(channel_sequential=True)

Step-by-Step Processing

For maximum control, process step by step:

from facet.io import Loader
from facet.preprocessing import TriggerDetector, UpSample
from facet.correction import AASCorrection

# 1. Load data
loader = Loader(path="data.edf", preload=True)
context = loader.execute(None)

# 2. Detect triggers
detector = TriggerDetector(regex=r"\b1\b")
context = detector.execute(context)
print(f"Found {len(context.get_triggers())} triggers")

# 3. Upsample
upsampler = UpSample(factor=10)
context = upsampler.execute(context)

# 4. Apply correction
aas = AASCorrection(window_size=30)
context = aas.execute(context)

# 5. Access results
corrected_raw = context.get_raw()
corrected_raw.save("corrected.fif")

Pipe Operator (|)

You can apply processors directly to a ProcessingContext using the pipe operator (__or__):

from facet import load, HighPassFilter, TriggerDetector, UpSample, AASCorrection

ctx = load("data.edf", preload=True)
ctx = (
    ctx
    | HighPassFilter(1.0)
    | TriggerDetector(r"\b1\b")
    | UpSample(10)
    | AASCorrection(window_size=30)
)

Execution Modes

Start with channel-sequential execution (recommended):

# Memory-optimized per-channel execution
result = pipeline.run(channel_sequential=True)

If your machine has enough free RAM and you want maximum throughput:

# Use all CPU cores (speed-oriented mode)
result = pipeline.run(parallel=True, n_jobs=-1)

# Use a specific number of cores
result = pipeline.run(parallel=True, n_jobs=4)

Processors automatically split by channel when safe to do so. For long recordings and upsampling-heavy pipelines, prefer channel_sequential=True to reduce peak memory usage.

Common Patterns

BCG Correction

For ballistocardiogram (BCG) artifact correction:

from facet.core import Pipeline
from facet.io import Loader, EDFExporter
from facet.preprocessing import QRSTriggerDetector
from facet.correction import AASCorrection

pipeline = Pipeline([
    Loader(path="data.edf", preload=True),
    QRSTriggerDetector(),  # Detect QRS complexes
    AASCorrection(window_size=20),  # Smaller window for BCG
    EDFExporter(path="corrected.edf")
])

Batch Processing

Process multiple files with the same pipeline:

from facet.core import Pipeline
from facet.io import Loader, EDFExporter
from facet.preprocessing import TriggerDetector, UpSample, DownSample
from facet.correction import AASCorrection

# Define reusable correction pipeline
correction = Pipeline([
    TriggerDetector(regex=r"\b1\b"),
    UpSample(factor=10),
    AASCorrection(window_size=30),
    DownSample(factor=10)
])

# Process multiple files
files = ["subject1.edf", "subject2.edf", "subject3.edf"]

for input_file in files:
    print(f"Processing {input_file}...")

    # Load
    loader = Loader(path=input_file, preload=True)
    context = loader.execute(None)

    # Correct
    result = correction.run(initial_context=context, channel_sequential=True)

    if result.success:
        # Save
        output = input_file.replace('.edf', '_corrected.edf')
        exporter = EDFExporter(path=output, overwrite=True)
        exporter.execute(result.context)
        print(f"  ✓ Saved to {output}")

Accessing Results

Get data and metrics from the result:

result = pipeline.run(channel_sequential=True)

# Corrected MNE Raw object
corrected_raw = result.context.get_raw()

# Trigger positions
triggers = result.context.get_triggers()

# Quality metrics
metrics = result.context.metadata.custom.get('metrics', {})
snr = metrics.get('snr')
rms_ratio = metrics.get('rms_ratio')

# Processing history
for entry in result.context.get_history():
    print(f"{entry.name} at {entry.timestamp}")

Next Steps