Examples

This page provides various examples of using FACETpy for different scenarios.

Running Repository Examples

The scripts on this page live in the repository examples/ directory. You can run them without uv. Install FACETpy with pip, then run the scripts from a repository checkout:

pip install facetpy
git clone https://github.com/H0mire/facetpy.git
cd facetpy
python examples/quickstart.py

For full contribution workflows (tests, linting, docs), use Contributing to FACETpy (uv-based).

Basic Correction

Minimal pipeline for basic correction:

from facet import create_standard_pipeline

pipeline = create_standard_pipeline(
    "data.edf",
    "corrected.edf",
    trigger_regex=r"\b1\b"
)

result = pipeline.run()

BCG Correction

Correct ballistocardiogram (BCG) artifacts:

from facet.core import Pipeline
from facet.io import Loader, EDFExporter
from facet.preprocessing import QRSTriggerDetector
from facet.correction import AASCorrection

pipeline = Pipeline([
    Loader(path="data.edf", preload=True),
    QRSTriggerDetector(),  # Detect QRS complexes
    AASCorrection(window_size=20),  # Smaller window for BCG
    EDFExporter(path="corrected.edf")
])

result = pipeline.run()

Batch Processing

Process multiple files:

from facet.core import Pipeline
from facet.preprocessing import TriggerDetector, UpSample, DownSample
from facet.correction import AASCorrection

files = ["s1.edf", "s2.mff", "s3.bdf"]

# Do not add Loader to the pipeline — map() loads each file automatically
pipeline = Pipeline([
    TriggerDetector(regex=r"\b1\b"),
    UpSample(factor=10),
    AASCorrection(window_size=30),
    DownSample(factor=10),
], name="Batch Correction")

results = pipeline.map(
    files,
    on_error="continue",   # log failures, keep going
)
results.print_summary()

Conditional Processing

Apply processors conditionally:

from facet.core import ConditionalProcessor
from facet import Pipeline, AASCorrection, SNRCalculator, PCACorrection

def needs_pca(context):
    metrics = context.metadata.custom.get('metrics', {})
    return metrics.get('snr', float('inf')) < 10

pipeline = Pipeline([
    # ... preprocessing ...
    AASCorrection(window_size=30),
    SNRCalculator(),
    ConditionalProcessor(
        condition=needs_pca,
        processor=PCACorrection(n_components=0.95)
    ),
    # ... export ...
])

Custom Processor

Create your own processor:

from facet.core import Pipeline, Processor, register_processor
from facet.io import EDFExporter, Loader

@register_processor
class CustomDenoiser(Processor):
    name = "custom_denoiser"
    description = "My custom denoising algorithm"

    def __init__(self, threshold=0.5):
        self.threshold = threshold
        super().__init__()

    def process(self, context):
        raw = context.get_raw()

        # Your custom logic here
        denoised_data = my_denoising_algorithm(
            raw.get_data(copy=False),
            threshold=self.threshold
        )

        # Create new raw with denoised data
        raw_denoised = raw.copy()
        raw_denoised._data = denoised_data

        return context.with_raw(raw_denoised)

# Use it
pipeline = Pipeline([
    Loader(path="data.edf", preload=True),
    CustomDenoiser(threshold=0.5),
    EDFExporter(path="denoised.edf")
])

Accessing Metrics

Get quality metrics:

from facet.evaluation import SNRCalculator, RMSCalculator

pipeline = Pipeline([
    # ... correction steps ...
    SNRCalculator(),
    RMSCalculator()
])

result = pipeline.run()

if result.success:
    metrics = result.context.metadata.custom['metrics']
    print(f"SNR: {metrics['snr']:.2f}")
    print(f"RMS improvement: {metrics['rms_ratio']:.2f}x")
    print(f"Per-channel SNR: {metrics['snr_per_channel']}")

Manual Trigger Specification

Provide triggers manually instead of detecting:

from facet.io import Loader
from facet.correction import AASCorrection

# Load data
loader = Loader(path="data.edf", preload=True)
context = loader.execute(None)

# Manually set trigger samples.
# artifact_length defaults to the median sample distance between triggers.
context = context.with_trigger_samples([1000, 2000, 3000, 4000])

# Continue with correction
aas = AASCorrection(window_size=30)
context = aas.execute(context)

Working with Different File Formats

BIDS Format

from facet import Pipeline
from facet.io import BIDSLoader, BIDSExporter

pipeline = Pipeline([
    BIDSLoader(
        root="/path/to/bids",
        subject="01",
        session="01",
        task="rest",
        run="01",
    ),
    # ... correction ...
    BIDSExporter(
        root="/path/to/bids_corrected",
        subject="01",
        session="01",
        task="rest"
    )
])

GDF Format

from facet.core import Pipeline
from facet.correction import AASCorrection
from facet.io import Loader
from facet.preprocessing import DownSample, TriggerDetector, UpSample
from facet.io import Loader, EDFExporter

pipeline = Pipeline([
    Loader(path="data.gdf", preload=True),
    # ... correction ...
    EDFExporter(path="corrected.edf")
])

Converting Between Formats

Use the convenience functions facet.load() and facet.export() to convert between supported formats (for example EDF, BDF, BrainVision, EEGLAB, and FIF):

from facet import load, export

INPUT_FILE = "./examples/datasets/NiazyFMRI.set"
OUTPUT_FILE = "./examples/datasets/NiazyFMRI.bdf"

ctx = load(INPUT_FILE)
export(ctx, OUTPUT_FILE)

Performance Optimization

Parallel Processing

# Maximum parallelization
result = pipeline.run(parallel=True, n_jobs=-1)

Memory-Efficient Processing

# For very large files, process in chunks (future feature)
# Currently, use preload=True and ensure sufficient RAM

Debugging

Check Processing History

result = pipeline.run()

if result.success:
    for entry in result.context.get_history():
        print(f"{entry.name}")
        print(f"  Timestamp: {entry.timestamp}")
        print(f"  Parameters: {entry.parameters}")

Save Intermediate Results

from facet.core import Pipeline

# Create sub-pipelines to save intermediate results
preprocessing = Pipeline([
    Loader(path="data.edf", preload=True),
    TriggerDetector(regex=r"\b1\b"),
    UpSample(factor=10)
])

result1 = preprocessing.run()
result1.context.get_raw().save("after_preprocessing.fif")

# Continue from there
correction = Pipeline([
    AASCorrection(window_size=30),
    DownSample(factor=10)
])

result2 = correction.run(initial_context=result1.context)

More Examples

See the examples/ directory in the repository for more complete examples:

  • complete_pipeline_example.py - Full correction workflow

  • advanced_workflows.py - Conditional steps, parallel execution, factory shortcut

  • channelwise_execution.py - Channel-wise execution: flag inspection, backend comparison, custom processor

  • batch_processing.py - Batch correction across multiple files with Pipeline.map()

  • evaluation.py - SNR, RMS, and other quality metrics

  • convert_types.py - Convert between file formats (e.g. SET to BDF) using load and export

  • inline_steps.py - Inline callable steps and ProcessingContext pipe operator

  • memory_efficient_pipeline.py - Streaming-style workflow for large recordings

  • quickstart.py - Minimal runnable correction example

  • eeg_generation_visualization_example.py - Synthetic EEG generation and plotting