Parallel and Channel-Sequential Processing

FACETpy supports multiple execution modes for multi-channel data. This guide explains when to use channel-sequential execution for memory efficiency and when to use parallelization for throughput.

Overview

FACETpy offers three execution strategies:

  1. Channel-sequential execution (recommended default) - Run a full channel-wise processor chain per channel to minimize peak memory.

  2. Processor-level parallelization - Split channels across CPU cores for maximum speed when sufficient RAM is available.

  3. Pipeline-level parallelization - Process multiple files concurrently.

Rule of thumb: start with channel_sequential=True for large recordings, upsampling-heavy pipelines, or constrained memory. Switch to parallel=True when speed is the primary goal and memory is not a bottleneck.

Quick Start

Start with channel-sequential execution:

from facet.core import Pipeline
from facet.preprocessing import TriggerDetector
from facet.correction import AASCorrection

pipeline = Pipeline([
    TriggerDetector(regex=r"\b1\b"),
    AASCorrection(window_size=30)
])

# Recommended for memory-sensitive workloads
result = pipeline.run(channel_sequential=True)

If memory headroom is sufficient and you want maximum throughput:

# Use all CPU cores for eligible processors
result = pipeline.run(parallel=True, n_jobs=-1)

Channel-Sequential Execution

Why It Matters

Channel-sequential execution is a core FACETpy feature for memory-efficient processing. Instead of creating large intermediate arrays for all channels at once, FACETpy processes one data channel through the full local processor chain before moving to the next channel.

This is especially important for pipelines that contain upsampling and artifact-correction steps (for example UpSample -> AASCorrection -> DownSample).

How It Works

With channel_sequential=True, consecutive processors marked as channel_wise=True (or run_once=True) are merged into one per-channel execution batch.

for each channel:
    channel -> HP-filter -> UpSample -> AAS -> DownSample -> store

Key properties:

  • Full high-sampling-rate intermediate data is not materialized for all channels at once.

  • run_once=True processors execute only for the first channel and their metadata is reused for the remaining channels.

  • Non-data channels (for example stim/misc) are passed through or resampled as needed.

When to Prefer Channel-Sequential

Use channel_sequential=True when:

  • recordings are long,

  • channel count is high,

  • upsampling factors are high,

  • runs fail or become unstable due to memory pressure.

Enabling Channel-Sequential Execution

from facet import (
    Pipeline, Loader, EDFExporter,
    TriggerDetector, HighPassFilter, LowPassFilter,
    UpSample, DownSample, AASCorrection,
)

pipeline = Pipeline([
    Loader(path="data.edf", preload=True),
    TriggerDetector(regex=r"\b1\b"),    # serial/global step
    HighPassFilter(freq=1.0),           # channel-wise
    UpSample(factor=10),                # channel-wise
    AASCorrection(window_size=30),      # channel-wise
    DownSample(factor=10),              # channel-wise
    LowPassFilter(freq=70),             # channel-wise
    EDFExporter(path="corrected.edf", overwrite=True),
])

result = pipeline.run(channel_sequential=True)

Processor-Level Parallelization

How It Works

When a processor is marked as parallel_safe = True, the ParallelExecutor can automatically split processing by channels:

Diagram showing how channels are split, processed in parallel, and merged back.

Enabling Parallelization

from facet.core import Pipeline
from facet.io import Loader
from facet.preprocessing import TriggerDetector
from facet.correction import AASCorrection

pipeline = Pipeline([
    Loader(path="data.edf", preload=True),
    TriggerDetector(regex=r"\b1\b"),
    AASCorrection(window_size=30)  # parallel_safe = True
])

# Speed-oriented mode (higher memory usage than channel-sequential)
result = pipeline.run(
    parallel=True,  # Enable parallelization
    n_jobs=-1       # Use all CPU cores
)

Specifying Number of Jobs

# Use all cores
result = pipeline.run(parallel=True, n_jobs=-1)

# Use 4 cores
result = pipeline.run(parallel=True, n_jobs=4)

# Use half of available cores
import multiprocessing
n_cores = multiprocessing.cpu_count() // 2
result = pipeline.run(parallel=True, n_jobs=n_cores)

Which Processors Support Parallelization?

Check the parallel_safe flag:

from facet.correction import AASCorrection
from facet.preprocessing import TriggerDetector

aas = AASCorrection(window_size=30)
detector = TriggerDetector(regex=r"\b1\b")

print(f"AAS parallel safe: {aas.parallel_safe}")  # True
print(f"Detector parallel safe: {detector.parallel_safe}")  # False

Processors marked parallel_safe = True:

  • AASCorrection

  • PCACorrection

  • HighPassFilter

  • LowPassFilter

  • BandPassFilter

  • NotchFilter

  • UpSample / DownSample / Resample

  • TriggerAligner / SliceAligner / SubsampleAligner (with run_once=True)

Processors that are NOT parallel safe:

  • TriggerDetector (operates on annotations)

  • MissingTriggerDetector / QRSTriggerDetector (stateful trigger operations)

  • ANCCorrection (currently marked parallel_safe = False)

  • Loader/EDFExporter (I/O operations)

  • SNRCalculator / RMSCalculator / MetricsReport (aggregation/reporting steps)

ParallelExecutor

The ParallelExecutor handles channel splitting automatically:

from facet.core import ParallelExecutor

# Manual usage (usually not needed)
executor = ParallelExecutor(n_jobs=4)
result_context = executor.execute(processor, input_context)

Channel-wise Execution

Channel-wise execution is the primary parallelisation strategy in FACETpy. Each EEG channel (or a group of channels) is processed independently in its own worker, and the results are merged back into a single ProcessingContext before the next pipeline step.

Opting In

A processor opts in by setting two class-level flags:

class MyProcessor(Processor):
    parallel_safe = True   # no cross-channel dependencies; safe for worker processes
    channel_wise  = True   # can operate on a single-channel subset context

parallel_safe and channel_wise are independent concepts. channel_wise is the flag used by both the parallel executor (parallel=True) and the channel-sequential executor (channel_sequential=True). In contrast, parallel_safe only affects multiprocessing mode.

Built-in processors that are channel-wise safe:

Processor

Why it is safe

AASCorrection

Artifact template is computed per channel independently

PCACorrection

PCA is applied per channel without cross-channel reads

HighPassFilter / LowPassFilter / BandPassFilter / NotchFilter

FIR/IIR filters operate on each channel’s time series in isolation

UpSample / DownSample

Resampling is applied independently to each channel

TriggerAligner

Reads one reference channel; has run_once=True so metadata is only computed once

Processors that are not channel-wise:

Processor

Why it is not channel-wise

TriggerDetector

Reads annotations shared across all channels

Loader / EDFExporter

File I/O is inherently serial

SNRCalculator / RMSCalculator

Aggregates across channels; splitting would produce partial metrics

Execution Flow

Diagram of parallel execution flow with split, per-worker processing, and merge.

Enabling it via the Pipeline

Pass parallel=True to Pipeline.run(...) for speed, or channel_sequential=True for memory-optimized per-channel execution. Processors that do not set channel_wise = True are silently executed serially — no special handling required.

from facet import (
    Pipeline, Loader, EDFExporter,
    TriggerDetector, HighPassFilter, LowPassFilter,
    UpSample, DownSample, AASCorrection,
)

pipeline = Pipeline([
    Loader(path="data.edf", preload=True),
    TriggerDetector(regex=r"\b1\b"),   # serial
    HighPassFilter(freq=1.0),           # channel-wise
    UpSample(factor=10),                # channel-wise
    AASCorrection(window_size=30),      # channel-wise
    DownSample(factor=10),              # channel-wise
    LowPassFilter(freq=70),             # channel-wise
    EDFExporter(path="corrected.edf", overwrite=True),
])

result = pipeline.run(channel_sequential=True)    # memory-optimized default
result = pipeline.run(parallel=True, n_jobs=-1)   # speed-oriented alternative

Choosing a Backend

You can also use ParallelExecutor directly to pick a specific backend:

from facet.core import ParallelExecutor

executor = ParallelExecutor(
    n_jobs   = 4,
    backend  = "multiprocessing",   # "multiprocessing" | "threading" | "serial"
    verbose  = True,
)
result_context = executor.execute(processor, context)

Backend

When to use

multiprocessing

Default. Each worker is a separate process — true CPU parallelism. Best for compute-heavy channel-wise processors (AAS, PCA, filtering, resampling).

threading

Shared memory, lower startup cost. Limited by the GIL for pure-Python code, but can help for NumPy/C-extension-heavy processors.

serial

Disables splitting. Useful for debugging or profiling to establish a baseline without parallelisation overhead.

Writing a Custom Channel-wise Processor

Any processor whose computation for channel A is independent of channel B can opt in. A minimal example:

import numpy as np
from facet.core import Processor, ProcessingContext

class ChannelZScoreNormalizer(Processor):
    """Per-channel z-score normalisation — parallel-safe."""

    name        = "channel_zscore_normalizer"
    description = "Normalise each channel to zero mean and unit variance"

    requires_raw  = True
    modifies_raw  = True
    parallel_safe = True
    channel_wise  = True

    def process(self, context: ProcessingContext) -> ProcessingContext:
        raw  = context.get_raw().copy()
        data = raw._data                           # [n_channels, n_samples]

        mean = data.mean(axis=1, keepdims=True)
        std  = data.std(axis=1, keepdims=True)
        std  = np.where(std == 0, 1.0, std)        # guard against flat channels

        raw._data = (data - mean) / std
        return context.with_raw(raw)

This processor will be split channel-wise by ParallelExecutor exactly like AASCorrection. No other changes are required.

See examples/channelwise_execution.py for a complete runnable example including benchmark helpers and backend comparisons.

Pipeline-Level Parallelization

Process Multiple Files

Use Python’s concurrent.futures to process multiple files in parallel:

import concurrent.futures
from pathlib import Path

def process_file(input_path):
    """Process a single file."""
    output_path = f"corrected/{Path(input_path).stem}_corrected.edf"

    pipeline = Pipeline([
        Loader(path=input_path, preload=True),
        TriggerDetector(regex=r"\b1\b"),
        AASCorrection(window_size=30),
        EDFExporter(path=output_path, overwrite=True)
    ])

    return pipeline.run()

# Process multiple files in parallel
input_files = list(Path("data").glob("*.edf"))

with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(process_file, input_files))

# Check results
for input_file, result in zip(input_files, results):
    status = "✓" if result.success else "✗"
    print(f"{status} {input_file.name}")

Thread Pool vs Process Pool

ThreadPoolExecutor - Faster startup, shared memory:

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    results = executor.map(process_file, files)

ProcessPoolExecutor - True parallelism, isolated memory:

with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
    results = executor.map(process_file, files)

Use ProcessPoolExecutor for CPU-intensive work (recommended).

Batch Processing Example

from concurrent.futures import ProcessPoolExecutor, as_completed
from tqdm import tqdm

def process_with_progress(input_files, n_workers=4):
    """Process files with progress bar."""
    results = {}

    with ProcessPoolExecutor(max_workers=n_workers) as executor:
        # Submit all jobs
        future_to_file = {
            executor.submit(process_file, f): f
            for f in input_files
        }

        # Process as they complete
        for future in tqdm(as_completed(future_to_file),
                          total=len(input_files),
                          desc="Processing"):
            input_file = future_to_file[future]
            try:
                result = future.result()
                results[input_file] = result
            except Exception as e:
                print(f"Error processing {input_file}: {e}")
                results[input_file] = None

    return results

# Use it
input_files = list(Path("data").glob("*.edf"))
results = process_with_progress(input_files, n_workers=4)

Performance Considerations

When to Use Parallelization

Good use cases:

  • Multi-channel data (>10 channels)

  • Long recordings (>10 minutes)

  • Batch processing multiple files

  • CPU-intensive operations (AAS, ANC, PCA)

Not beneficial:

  • Single-channel data

  • Very short recordings (<1 minute)

  • Single file processing with few channels

  • I/O-bound operations

For memory-bound pipelines, prefer channel_sequential=True instead of parallel=True.

Optimal Number of Workers

import multiprocessing

# Get CPU count
n_cpus = multiprocessing.cpu_count()
print(f"Available CPUs: {n_cpus}")

# General guidelines:
# - Processor-level: Use all cores (n_jobs=-1)
# - Pipeline-level: Use n_cpus - 1 (leave one for system)

# For mixed workload
n_pipeline_workers = max(1, n_cpus - 1)
n_processor_jobs = 2  # Use fewer per-file to allow more files

Memory Usage

Parallel processing increases memory usage:

# Memory per worker ≈ raw data size + processing overhead

# Example: 32 channels, 5000 Hz, 10 min recording
# Memory ≈ 32 * 5000 * 600 * 8 bytes ≈ 750 MB per worker

# With 4 workers: 4 * 750 MB = 3 GB

# Monitor memory
import psutil
print(f"Available memory: {psutil.virtual_memory().available / 1e9:.1f} GB")

For the same processor chain, channel_sequential=True typically lowers peak memory because only one channel’s large intermediate representation is kept in memory at a time.

Overhead

Parallelization has overhead:

  • Starting worker processes

  • Splitting/merging data

  • Inter-process communication

Only worth it if processing time >> overhead.

# Rule of thumb:
# Parallelize if processing time > 10 seconds per file

Benchmarking

Compare Performance

import time

# Serial processing
start = time.time()
result = pipeline.run(parallel=False, channel_sequential=False)
serial_time = time.time() - start

# Channel-sequential processing (memory-optimized)
start = time.time()
result = pipeline.run(channel_sequential=True)
channel_seq_time = time.time() - start

# Parallel processing
start = time.time()
result = pipeline.run(parallel=True, n_jobs=-1)
parallel_time = time.time() - start

# Compare runtimes
print(f"Serial:            {serial_time:.2f}s")
print(f"Channel-sequential:{channel_seq_time:.2f}s")
print(f"Parallel:          {parallel_time:.2f}s")

Scaling Analysis

def benchmark_scaling(pipeline, context, max_workers=8):
    """Benchmark scaling with different worker counts."""
    results = {}

    for n_workers in range(1, max_workers + 1):
        start = time.time()
        result = pipeline.run(
            initial_context=context,
            parallel=True,
            n_jobs=n_workers
        )
        elapsed = time.time() - start
        results[n_workers] = elapsed

    return results

# Run benchmark
scaling = benchmark_scaling(pipeline, context)

# Plot results
import matplotlib.pyplot as plt

workers = list(scaling.keys())
times = list(scaling.values())

plt.plot(workers, times, marker='o')
plt.xlabel('Number of Workers')
plt.ylabel('Processing Time (s)')
plt.title('Parallel Scaling')
plt.grid(True)
plt.show()

Best Practices

  1. Profile First

    Measure before optimizing:

    # Time your pipeline
    import time
    start = time.time()
    result = pipeline.run()
    print(f"Time: {time.time() - start:.2f}s")
    
  2. Start with Memory-Safe Mode

    Use channel-sequential first, then scale to parallel if needed:

    # Recommended starting point
    result = pipeline.run(channel_sequential=True)
    
    # If runtime is the bottleneck and memory is sufficient:
    result = pipeline.run(parallel=True, n_jobs=-1)
    
  3. Monitor Resources

    Watch CPU and memory:

    import psutil
    
    # Before processing
    cpu_percent = psutil.cpu_percent(interval=1)
    mem = psutil.virtual_memory()
    print(f"CPU: {cpu_percent}%, Memory: {mem.percent}%")
    
  4. Handle Errors

    Parallel processing can hide errors:

    try:
        result = pipeline.run(parallel=True, n_jobs=-1)
        if not result.success:
            print(f"Pipeline failed: {result.error}")
    except Exception as e:
        print(f"Unexpected error: {e}")
    
  5. Batch Appropriately

    Group files by size:

    # Process large files one at a time
    large_files = [f for f in files if f.stat().st_size > 1e9]
    
    # Process small files in parallel
    small_files = [f for f in files if f.stat().st_size <= 1e9]
    

Troubleshooting

Parallel Processing Not Working

Check if processors support parallelization:

for proc in pipeline.processors:
    if not proc.parallel_safe:
        print(f"{proc.name} is not parallel safe")

Out of Memory Errors

First switch to channel-sequential mode. If needed, then reduce workers:

# Preferred fix for memory pressure
result = pipeline.run(channel_sequential=True)

# Use fewer workers
result = pipeline.run(parallel=True, n_jobs=2)

# Or disable both advanced modes
result = pipeline.run(parallel=False, channel_sequential=False)

Slower with Parallelization

Overhead may exceed benefit:

# Use simple serial mode for very small datasets
if n_channels < 10 or duration < 60:
    result = pipeline.run(parallel=False, channel_sequential=False)
# Use memory-optimized channel-sequential mode for larger but memory-bound data
elif memory_limited:
    result = pipeline.run(channel_sequential=True)
else:
    result = pipeline.run(parallel=True, n_jobs=-1)

Advanced Topics

Custom Parallel Executors

Create custom parallel execution:

from facet.core import ParallelExecutor

class CustomExecutor(ParallelExecutor):
    def __init__(self, n_jobs=-1, backend='multiprocessing'):
        super().__init__(n_jobs=n_jobs)
        self.backend = backend

    def execute(self, processor, context):
        # Custom parallel execution logic
        return super().execute(processor, context)

GPU Acceleration

FACETpy doesn’t currently support GPU acceleration, but you can create custom processors that use GPU libraries:

from facet.core import Processor
import torch  # or cupy, jax, etc.

class GPUProcessor(Processor):
    name = "gpu_processor"
    parallel_safe = False  # GPU handles parallelization

    def process(self, context):
        raw = context.get_raw()
        data = torch.from_numpy(raw._data).cuda()

        # GPU processing
        result = self.gpu_function(data)

        # Copy back to CPU
        raw._data = result.cpu().numpy()
        return context.with_raw(raw)

Distributed Processing

For cluster environments, use Dask:

from dask.distributed import Client
from dask import delayed

# Connect to cluster
client = Client('scheduler-address:8786')

# Create delayed tasks
tasks = [delayed(process_file)(f) for f in files]

# Compute in parallel
results = client.compute(tasks)

Next Steps