Examples
This page provides various examples of using FACETpy for different scenarios.
Running Repository Examples
The scripts on this page live in the repository examples/ directory.
You can run them without uv.
Install FACETpy with pip, then run the scripts from a repository checkout:
pip install facetpy
git clone https://github.com/H0mire/facetpy.git
cd facetpy
python examples/quickstart.py
For full contribution workflows (tests, linting, docs), use Contributing to FACETpy (uv-based).
Basic Correction
Minimal pipeline for basic correction:
from facet import create_standard_pipeline
pipeline = create_standard_pipeline(
"data.edf",
"corrected.edf",
trigger_regex=r"\b1\b"
)
result = pipeline.run()
BCG Correction
Correct ballistocardiogram (BCG) artifacts:
from facet.core import Pipeline
from facet.io import Loader, EDFExporter
from facet.preprocessing import QRSTriggerDetector
from facet.correction import AASCorrection
pipeline = Pipeline([
Loader(path="data.edf", preload=True),
QRSTriggerDetector(), # Detect QRS complexes
AASCorrection(window_size=20), # Smaller window for BCG
EDFExporter(path="corrected.edf")
])
result = pipeline.run()
Batch Processing
Process multiple files:
from facet.core import Pipeline
from facet.preprocessing import TriggerDetector, UpSample, DownSample
from facet.correction import AASCorrection
files = ["s1.edf", "s2.mff", "s3.bdf"]
# Do not add Loader to the pipeline — map() loads each file automatically
pipeline = Pipeline([
TriggerDetector(regex=r"\b1\b"),
UpSample(factor=10),
AASCorrection(window_size=30),
DownSample(factor=10),
], name="Batch Correction")
results = pipeline.map(
files,
on_error="continue", # log failures, keep going
)
results.print_summary()
Conditional Processing
Apply processors conditionally:
from facet.core import ConditionalProcessor
from facet import Pipeline, AASCorrection, SNRCalculator, PCACorrection
def needs_pca(context):
metrics = context.metadata.custom.get('metrics', {})
return metrics.get('snr', float('inf')) < 10
pipeline = Pipeline([
# ... preprocessing ...
AASCorrection(window_size=30),
SNRCalculator(),
ConditionalProcessor(
condition=needs_pca,
processor=PCACorrection(n_components=0.95)
),
# ... export ...
])
Custom Processor
Create your own processor:
from facet.core import Pipeline, Processor, register_processor
from facet.io import EDFExporter, Loader
@register_processor
class CustomDenoiser(Processor):
name = "custom_denoiser"
description = "My custom denoising algorithm"
def __init__(self, threshold=0.5):
self.threshold = threshold
super().__init__()
def process(self, context):
raw = context.get_raw()
# Your custom logic here
denoised_data = my_denoising_algorithm(
raw.get_data(copy=False),
threshold=self.threshold
)
# Create new raw with denoised data
raw_denoised = raw.copy()
raw_denoised._data = denoised_data
return context.with_raw(raw_denoised)
# Use it
pipeline = Pipeline([
Loader(path="data.edf", preload=True),
CustomDenoiser(threshold=0.5),
EDFExporter(path="denoised.edf")
])
Accessing Metrics
Get quality metrics:
from facet.evaluation import SNRCalculator, RMSCalculator
pipeline = Pipeline([
# ... correction steps ...
SNRCalculator(),
RMSCalculator()
])
result = pipeline.run()
if result.success:
metrics = result.context.metadata.custom['metrics']
print(f"SNR: {metrics['snr']:.2f}")
print(f"RMS improvement: {metrics['rms_ratio']:.2f}x")
print(f"Per-channel SNR: {metrics['snr_per_channel']}")
Manual Trigger Specification
Provide triggers manually instead of detecting:
from facet.io import Loader
from facet.correction import AASCorrection
# Load data
loader = Loader(path="data.edf", preload=True)
context = loader.execute(None)
# Manually set trigger samples.
# artifact_length defaults to the median sample distance between triggers.
context = context.with_trigger_samples([1000, 2000, 3000, 4000])
# Continue with correction
aas = AASCorrection(window_size=30)
context = aas.execute(context)
Working with Different File Formats
BIDS Format
from facet import Pipeline
from facet.io import BIDSLoader, BIDSExporter
pipeline = Pipeline([
BIDSLoader(
root="/path/to/bids",
subject="01",
session="01",
task="rest",
run="01",
),
# ... correction ...
BIDSExporter(
root="/path/to/bids_corrected",
subject="01",
session="01",
task="rest"
)
])
GDF Format
from facet.core import Pipeline
from facet.correction import AASCorrection
from facet.io import Loader
from facet.preprocessing import DownSample, TriggerDetector, UpSample
from facet.io import Loader, EDFExporter
pipeline = Pipeline([
Loader(path="data.gdf", preload=True),
# ... correction ...
EDFExporter(path="corrected.edf")
])
Converting Between Formats
Use the convenience functions facet.load() and facet.export() to
convert between supported formats (for example EDF, BDF, BrainVision, EEGLAB,
and FIF):
from facet import load, export
INPUT_FILE = "./examples/datasets/NiazyFMRI.set"
OUTPUT_FILE = "./examples/datasets/NiazyFMRI.bdf"
ctx = load(INPUT_FILE)
export(ctx, OUTPUT_FILE)
Performance Optimization
Parallel Processing
# Maximum parallelization
result = pipeline.run(parallel=True, n_jobs=-1)
Memory-Efficient Processing
# For very large files, process in chunks (future feature)
# Currently, use preload=True and ensure sufficient RAM
Debugging
Check Processing History
result = pipeline.run()
if result.success:
for entry in result.context.get_history():
print(f"{entry.name}")
print(f" Timestamp: {entry.timestamp}")
print(f" Parameters: {entry.parameters}")
Save Intermediate Results
from facet.core import Pipeline
# Create sub-pipelines to save intermediate results
preprocessing = Pipeline([
Loader(path="data.edf", preload=True),
TriggerDetector(regex=r"\b1\b"),
UpSample(factor=10)
])
result1 = preprocessing.run()
result1.context.get_raw().save("after_preprocessing.fif")
# Continue from there
correction = Pipeline([
AASCorrection(window_size=30),
DownSample(factor=10)
])
result2 = correction.run(initial_context=result1.context)
More Examples
See the examples/ directory in the repository for more complete examples:
complete_pipeline_example.py- Full correction workflowadvanced_workflows.py- Conditional steps, parallel execution, factory shortcutchannelwise_execution.py- Channel-wise execution: flag inspection, backend comparison, custom processorbatch_processing.py- Batch correction across multiple files withPipeline.map()evaluation.py- SNR, RMS, and other quality metricsconvert_types.py- Convert between file formats (e.g. SET to BDF) usingloadandexportinline_steps.py- Inline callable steps andProcessingContextpipe operatormemory_efficient_pipeline.py- Streaming-style workflow for large recordingsquickstart.py- Minimal runnable correction exampleeeg_generation_visualization_example.py- Synthetic EEG generation and plotting