Pipeline Guide
Pipelines are the primary way to compose and execute processing workflows in FACETpy. This guide covers everything you need to know about creating and using pipelines.
What is a Pipeline?
A Pipeline is a container that executes a sequence of processors in order. Each processor receives the output context from the previous processor, processes it, and passes its output to the next processor.
from facet.core import Pipeline
from facet.preprocessing import TriggerDetector, UpSample
from facet.correction import AASCorrection
pipeline = Pipeline([
TriggerDetector(regex=r"\b1\b"),
UpSample(factor=10),
AASCorrection(window_size=30)
])
result = pipeline.run(initial_context=context)
Basic Pipeline Usage
Creating a Pipeline
Create a pipeline by passing a list of processors:
from facet.core import Pipeline
from facet.io import Loader, EDFExporter
from facet.preprocessing import TriggerDetector
from facet.correction import AASCorrection
pipeline = Pipeline([
Loader(path="data.edf", preload=True),
TriggerDetector(regex=r"\b1\b"),
AASCorrection(window_size=30),
EDFExporter(path="output.edf", overwrite=True)
], name="My Pipeline")
Running a Pipeline
Execute the pipeline with the run() method:
result = pipeline.run()
if result.success:
print(f"Pipeline completed in {result.execution_time:.2f}s")
final_context = result.context
else:
print(f"Pipeline failed: {result.error}")
With Initial Context
Start with an existing context:
from facet.core import ProcessingContext
# Create initial context
initial_context = ProcessingContext(raw=raw, metadata=metadata)
# Run pipeline starting from this context
result = pipeline.run(initial_context=initial_context)
Pipeline Results
The PipelineResult object contains:
success(bool) - Whether pipeline completed successfullycontext(ProcessingContext) - Final processing contexterror(Exception) - Exception if pipeline failedexecution_time(float) - Total execution time in seconds
result = pipeline.run()
print(f"Success: {result.success}")
print(f"Time: {result.execution_time:.2f}s")
if result.success:
# Access final data
corrected_raw = result.context.get_raw()
metrics = result.context.metadata.custom.get('metrics', {})
# Access processing history
history = result.context.get_history()
for step in history:
print(f"- {step.name}: {step.parameters}")
else:
print(f"Error: {result.error}")
Advanced Pipeline Features
Pipe Operator
For ad-hoc workflows outside a Pipeline, ProcessingContext supports the
pipe operator (__or__). The right-hand side can be a processor instance or
any callable accepting and returning a context.
from facet import load, HighPassFilter, TriggerDetector, AASCorrection
ctx = load("data.edf", preload=True)
ctx = (
ctx
| HighPassFilter(1.0)
| TriggerDetector(r"\b1\b")
| AASCorrection(window_size=30)
)
Inline Lambdas for Context-Dependent Parameters
Pipeline steps can be plain callables (lambdas or def functions) that take a
ProcessingContext and return one. The Pipeline wraps them in a
LambdaProcessor, so they behave like any other processor.
This is useful when a processor needs parameters that depend on the current
context at runtime (e.g. data loaded earlier in the pipeline). For example,
RawPlotter expects a fixed duration in seconds when constructed, but you
may want to plot the full dataset whose length is unknown until the pipeline
runs. Use a lambda that creates and runs the processor with a context-derived
parameter:
from facet import Pipeline, RawPlotter, Loader
from pathlib import Path
output_dir = Path("./output")
pipeline = Pipeline([
Loader(path="data.edf", preload=True),
# ... other steps ...
# Plot full dataset: duration comes from context at runtime
lambda ctx: ctx | RawPlotter(
mode="matplotlib",
channel="Fp1",
duration=ctx.get_raw().times[-1], # full recording length
overlay_original=False,
save_path=str(output_dir / "before_after.png"),
show=True,
title="Fp1 — Before vs After Correction",
),
])
The lambda is invoked only when the pipeline reaches that step, so
ctx.get_raw() is available and times[-1] gives the total duration in
seconds. Alternatively, use duration=0 with RawPlotter to mean “plot
entire dataset” without a lambda.
Parallel Execution
Enable parallel processing for compatible processors:
# Use all CPU cores
result = pipeline.run(parallel=True, n_jobs=-1)
# Use specific number of cores
result = pipeline.run(parallel=True, n_jobs=4)
Processors marked with parallel_safe = True will be automatically
parallelized by channel when this is enabled.
Fluent API
Modify pipelines using the fluent API:
pipeline = Pipeline([processor1, processor2])
# Add processor
pipeline.add(processor3)
# Insert at position
pipeline.insert(1, new_processor)
# Remove processor
pipeline.remove(2)
# Method chaining
pipeline.add(processor4).add(processor5)
Pipeline Inspection
Inspect pipeline structure:
# Get number of processors
print(f"Pipeline has {len(pipeline)} processors")
# Get processor by index
first_processor = pipeline[0]
# Get human-readable description
print(pipeline.describe())
# Serialize to dictionary
pipeline_dict = pipeline.to_dict()
Validation
Validate all processors before running:
# Check if pipeline can run with given context
errors = pipeline.validate_all(context)
if errors:
print("Validation errors:")
for error in errors:
print(f" - {error}")
else:
print("Pipeline is valid")
result = pipeline.run(initial_context=context)
Pipeline Builder
The PipelineBuilder provides a fluent interface for constructing pipelines:
from facet.core import PipelineBuilder
builder = PipelineBuilder(name="My Workflow")
pipeline = (builder
.add(Loader(path="data.edf"))
.add(TriggerDetector(regex=r"\b1\b"))
.add(UpSample(factor=10))
.add(AASCorrection(window_size=30))
.add(DownSample(factor=10))
.add(EDFExporter(path="output.edf"))
.build())
Conditional Addition
Add processors conditionally:
use_anc = True
use_pca = False
pipeline = (PipelineBuilder()
.add(TriggerDetector(regex=r"\b1\b"))
.add(AASCorrection(window_size=30))
.add_if(use_anc, ANCCorrection(filter_order=5))
.add_if(use_pca, PCACorrection(n_components=0.95))
.build())
Common Pipeline Patterns
Standard Correction Pipeline
from facet import create_standard_pipeline
pipeline = create_standard_pipeline(
input_path="data.edf",
output_path="corrected.edf",
trigger_regex=r"\b1\b",
upsample_factor=10,
use_anc=True,
use_pca=False
)
result = pipeline.run()
Custom Correction Pipeline
pipeline = Pipeline([
Loader(path="data.edf", preload=True),
TriggerDetector(regex=r"\b1\b"),
UpSample(factor=10),
TriggerAligner(ref_trigger_index=0),
AASCorrection(window_size=30, correlation_threshold=0.975),
ANCCorrection(filter_order=5, hp_freq=1.0),
DownSample(factor=10),
HighPassFilter(freq=0.5),
SNRCalculator(),
RMSCalculator(),
MetricsReport(),
EDFExporter(path="corrected.edf", overwrite=True)
], name="Custom Correction")
Batch Processing Pipeline
Process multiple files:
import glob
from pathlib import Path
# Create pipeline template
def create_pipeline(input_path, output_path):
return Pipeline([
Loader(path=input_path, preload=True),
TriggerDetector(regex=r"\b1\b"),
AASCorrection(window_size=30),
EDFExporter(path=output_path, overwrite=True)
])
# Process all files
input_files = glob.glob("data/*.edf")
for input_file in input_files:
output_file = f"corrected/{Path(input_file).stem}_corrected.edf"
pipeline = create_pipeline(input_file, output_file)
result = pipeline.run()
print(f"{input_file}: {'✓' if result.success else '✗'}")
Evaluation-Only Pipeline
Evaluate already corrected data:
pipeline = Pipeline([
Loader(path="corrected.edf", preload=True),
TriggerDetector(regex=r"\b1\b"),
SNRCalculator(),
RMSCalculator(),
MedianArtifactCalculator(),
MetricsReport()
], name="Evaluation")
result = pipeline.run()
metrics = result.context.metadata.custom['metrics']
Error Handling
Handling Pipeline Failures
result = pipeline.run()
if not result.success:
print(f"Pipeline failed: {result.error}")
# Check which processor failed
if result.context:
history = result.context.get_history()
print(f"Completed {len(history)} steps before failure")
# Re-raise exception if needed
raise result.error
Try-Except Pattern
try:
result = pipeline.run()
if result.success:
print("Success!")
else:
print(f"Failed: {result.error}")
except Exception as e:
print(f"Unexpected error: {e}")
Performance Tips
Use Parallel Processing Enable parallelization for multi-channel data:
result = pipeline.run(parallel=True, n_jobs=-1)
Preload Data Load data into memory for faster processing:
Loader(path="data.edf", preload=True)
Optimize Window Size Larger AAS window sizes are faster but less adaptive:
AASCorrection(window_size=30) # Good balance
Reduce Upsampling Use lower factors if precision allows:
UpSample(factor=5) # Instead of 10
Profile Pipeline Check execution times:
result = pipeline.run() print(f"Total time: {result.execution_time:.2f}s") for step in result.context.get_history(): print(f"{step.name}: {step.timestamp}")
Best Practices
Name Your Pipelines
Pipeline([...], name="Descriptive Name")
Validate Before Running
errors = pipeline.validate_all(context) if not errors: result = pipeline.run(initial_context=context)
Check Results
if result.success: # Process results pass else: # Handle error pass
Use Type Hints
from facet.core import Pipeline, PipelineResult def process_file(path: str) -> PipelineResult: pipeline = Pipeline([...]) return pipeline.run()
Log Progress
FACETpy still records every detail with
loguru(seelogs/*.log), but the console now streams a Rich-powered dashboard that tracks processor states, durations, and a live progress bar. This interactive view is enabled by default—just run your pipeline and watch the terminal update in place.Prefer the legacy line-by-line console output? Set
FACET_CONSOLE_MODE=classic(orlegacy) before starting Python and you’ll get the traditional loguru sink back while file logging remains untouched.Unix (macOS/Linux):
FACET_CONSOLE_MODE=classic python my_pipeline.py
Windows (PowerShell):
$env:FACET_CONSOLE_MODE = "classic"; python my_pipeline.py
You can still log explicitly from processors using loguru:
from loguru import logger logger.info("Starting pipeline") result = pipeline.run() logger.info(f"Completed in {result.execution_time:.2f}s")
Next Steps
Learn about Processors Guide in detail
Explore Parallel and Channel-Sequential Processing capabilities
Create Custom Processors for your needs
Check out Examples for complete workflows