Source code for facet.core.pipeline

"""
Pipeline Module

This module defines the Pipeline class for executing sequences of processors.

Author: FACETpy Team
Date: 2025-01-12
"""

from collections.abc import Callable
from typing import Any

from loguru import logger

from ..console import get_console
from ..console.progress import set_current_step_index
from .channel_sequential import ChannelSequentialExecutor
from .context import ProcessingContext
from .parallel import ParallelExecutor
from .processor import Processor


[docs] class PipelineError(Exception): """Base exception for pipeline-related errors.""" pass
[docs] class PipelineResult: """ Result of pipeline execution. Contains the final context and metadata about execution. """
[docs] def __init__( self, context: ProcessingContext | None, success: bool = True, error: Exception | None = None, execution_time: float = 0.0, failed_processor: str | None = None, failed_processor_index: int | None = None, ): """ Initialize pipeline result. Args: context: Final processing context success: Whether pipeline completed successfully error: Exception if pipeline failed execution_time: Total execution time in seconds """ self.context = context self.success = success self.error = error self.execution_time = execution_time self.failed_processor = failed_processor self.failed_processor_index = failed_processor_index
[docs] def get_context(self) -> ProcessingContext: """Get final processing context.""" return self.context
[docs] def get_raw(self): """Get final raw data (convenience method).""" return self.context.get_raw()
[docs] def get_history(self): """Get processing history.""" return self.context.get_history()
[docs] def was_successful(self) -> bool: """Check if pipeline succeeded.""" return self.success
@property def metrics(self) -> dict[str, Any]: """ Shortcut to evaluation metrics stored in context. Returns the ``metrics`` dict from ``context.metadata.custom``, or an empty dict if no metrics have been calculated yet. Example:: result = pipeline.run() print(result.metrics['snr']) """ if self.context is None: return {} return self.context.metadata.custom.get("metrics", {}) @property def metrics_df(self): """ Return scalar evaluation metrics as a ``pandas.Series``. Nested dicts (e.g. ``fft_allen``) are flattened with ``_`` separators. Returns ``None`` if pandas is not available. Example:: result = pipeline.run() print(result.metrics_df) """ try: import pandas as pd except ImportError: return None flat: dict[str, Any] = {} for k, v in self.metrics.items(): if isinstance(v, (int, float)): flat[k] = v elif isinstance(v, dict): for sub_k, sub_v in v.items(): if isinstance(sub_v, (int, float)): flat[f"{k}_{sub_k}"] = sub_v return pd.Series(flat, name=self.context.metadata.custom.get("pipeline_name", "metrics"))
[docs] def metric(self, name: str, default=None): """ Return a single evaluation metric by name. Shortcut for ``result.metrics.get(name, default)`` that avoids having to remember the dict key and provides a clean default. Args: name: Metric name (e.g. ``'snr'``, ``'rms_ratio'``). default: Value returned when the metric is absent. Example:: snr = result.metric('snr') print(f"SNR = {snr:.2f} dB") """ return self.metrics.get(name, default)
[docs] def print_metrics(self) -> None: """ Print a formatted table of all evaluation metrics. Uses *rich* for colour and alignment when available. Example:: result = pipeline.run() result.print_metrics() """ import numpy as np from rich import box from rich.console import Console as RichConsole from rich.panel import Panel from rich.table import Table from rich.text import Text metrics = self.metrics if not metrics: print("No metrics available — did you add evaluation processors?") return con = get_console().get_rich_console() or RichConsole(highlight=False) table = Table( box=None, show_header=True, padding=(0, 2), expand=True, show_edge=False, ) table.add_column("Metric", style="bold", ratio=3) table.add_column("Value", style="white", ratio=2, justify="left") table.add_column("", style="dim italic", ratio=1) def _section(title: str) -> None: table.add_row("", "", "") table.add_row(Text(title, style="bold yellow underline"), "", "") def _fmt_per_channel(val: list) -> str: arr = np.asarray(val, dtype=float) return f"mean {arr.mean():.3g} ± {arr.std():.3g} [dim](min {arr.min():.3g} – max {arr.max():.3g})[/]" def _color_snr(v: float) -> str: return "green" if v > 10 else ("yellow" if v > 3 else "red") def _color_ratio(v: float) -> str: return "green" if abs(v - 1.0) < 0.1 else ("yellow" if abs(v - 1.0) < 0.3 else "red") # --- Core scalar metrics --- core_keys = ("snr", "rms_ratio", "rms_residual", "median_artifact", "legacy_snr") if any(k in metrics for k in core_keys): _section("Core Metrics") if "snr" in metrics: snr = metrics["snr"] c = _color_snr(snr) table.add_row("SNR (Signal-to-Noise Ratio)", f"[{c}]{snr:.2f}[/]", "") if "rms_ratio" in metrics: table.add_row("RMS Ratio (improvement)", f"{metrics['rms_ratio']:.2f}", "×") if "rms_residual" in metrics: r = metrics["rms_residual"] c = _color_ratio(r) table.add_row("RMS Residual Ratio", f"[{c}]{r:.2f}[/]", "target: 1.0") if "median_artifact" in metrics: table.add_row("Median Artifact Amplitude", f"{metrics['median_artifact']:.2e}", "") if "median_artifact_ratio" in metrics: r = metrics["median_artifact_ratio"] c = "green" if abs(r - 1.0) < 0.2 else ("yellow" if abs(r - 1.0) < 0.6 else "red") table.add_row("Median Artifact Ratio", f"[{c}]{r:.2f}[/]", "target: 1.0") if "legacy_snr" in metrics: table.add_row("Legacy SNR", f"{metrics['legacy_snr']:.2f}", "") # --- Per-channel breakdowns --- per_ch = {k: v for k, v in metrics.items() if k.endswith("_per_channel") and isinstance(v, list)} if per_ch: _section("Per-Channel Summary (mean ± std, min – max)") for key, val in per_ch.items(): label = key.replace("_per_channel", "").replace("_", " ").title() table.add_row(label, _fmt_per_channel(val), "") # --- FFT Allen --- if "fft_allen" in metrics: _section("FFT Allen — Spectral Diff to Reference") for band, val in metrics["fft_allen"].items(): table.add_row(f"{band.capitalize()}", f"{val:.2f}%", "") # --- FFT Niazy --- if "fft_niazy" in metrics: _section("FFT Niazy — Power Ratio (Uncorr / Corr)") if "slice" in metrics["fft_niazy"]: harmonics = " ".join(f"[cyan]{k}[/]: {v:.2f}" for k, v in metrics["fft_niazy"]["slice"].items()) table.add_row("Slice Harmonics", harmonics, "dB") if "volume" in metrics["fft_niazy"]: harmonics = " ".join(f"[cyan]{k}[/]: {v:.2f}" for k, v in metrics["fft_niazy"]["volume"].items()) table.add_row("Volume Harmonics", harmonics, "dB") # --- Other unknown keys --- known = ( set(core_keys) | set(per_ch) | {"median_artifact_ratio", "median_artifact_reference", "fft_allen", "fft_niazy"} ) extras = {k: v for k, v in metrics.items() if k not in known} if extras: _section("Other") for key, val in extras.items(): label = key.replace("_", " ").title() if isinstance(val, float): formatted = f"{val:.4g}" elif isinstance(val, dict): formatted = " ".join( f"{k}: {v:.3g}" if isinstance(v, float) else f"{k}: {v}" for k, v in val.items() ) elif isinstance(val, list): formatted = _fmt_per_channel(val) if val and isinstance(val[0], (int, float)) else str(val) else: formatted = str(val) table.add_row(label, formatted, "") con.print() con.print( Panel( table, title="[bold white] Evaluation Metrics Report [/]", border_style="cyan", box=box.ROUNDED, padding=(1, 2), ) )
[docs] def print_summary(self) -> None: """ Print a one-line summary of the pipeline result. Shows success/failure, execution time, and any key metrics (SNR, RMS ratio, RMS residual) that were calculated. Example:: result = pipeline.run() result.print_summary() """ from rich.console import Console as RichConsole con = get_console().get_rich_console() or RichConsole() if self.success: parts = [f"[green]Done[/green] in {self.execution_time:.2f}s"] for name in ("snr", "rms_ratio", "rms_residual", "median_artifact"): val = self.metrics.get(name) if val is not None: parts.append(f"{name}={val:.3g}") con.print(" ".join(parts)) else: con.print(f"[red]Failed[/red] after {self.execution_time:.2f}s — {self.error}")
[docs] def plot(self, **kwargs): """ Plot the corrected data using ``RawPlotter`` defaults. Accepts any keyword arguments supported by ``RawPlotter``. Example:: result = pipeline.run() result.plot(channel="Fp1", start=5.0, duration=10.0) """ from ..evaluation import RawPlotter plotter = RawPlotter(**kwargs) plotter.execute(self.context)
[docs] def release_raw(self) -> None: """ Release the Raw data held by the context to free memory. After calling this, ``get_raw()`` and ``plot()`` will no longer work, but :attr:`metrics` and ``execution_time`` remain accessible. Useful when running batch jobs where you only need summary statistics. """ if self.context is not None: self.context._raw = None self.context._raw_original = None
def __repr__(self) -> str: status = "SUCCESS" if self.success else "FAILED" return f"PipelineResult({status}, time={self.execution_time:.2f}s)"
[docs] class BatchResult: """ Result of ``Pipeline.map(...)`` - a list of :class:`~facet.core.pipeline.PipelineResult` objects with built-in helpers for quick inspection. It behaves like a regular list (iteration, indexing, ``len``), so existing code that iterates over the return value of ``Pipeline.map()`` continues to work without changes. Example:: results = pipeline.map(files, loader_factory=lambda p: Loader(p)) results.print_summary() df = results.summary_df """
[docs] def __init__( self, results: list["PipelineResult"], labels: list[str] | None = None, ): self._results = results self._labels = labels or [f"input_{i}" for i in range(len(results))]
# ------------------------------------------------------------------ # List-like interface # ------------------------------------------------------------------ def __iter__(self): return iter(self._results) def __getitem__(self, index): return self._results[index] def __len__(self): return len(self._results) def __repr__(self): n_ok = sum(1 for r in self._results if r.success) return f"BatchResult({n_ok}/{len(self._results)} succeeded)" # ------------------------------------------------------------------ # Convenience helpers # ------------------------------------------------------------------
[docs] def print_summary(self) -> None: """ Print a formatted table with one row per input file. Columns include the file label, success/failure status, execution time, and any scalar metrics that were computed. Example:: results = pipeline.map(files, loader_factory=...) results.print_summary() """ from rich import box from rich.console import Console as RichConsole from rich.table import Table con = get_console().get_rich_console() or RichConsole(highlight=False) table = Table( title="Batch Results", show_header=True, header_style="bold cyan", box=box.SIMPLE_HEAVY, padding=(0, 1), ) table.add_column("File", style="bold", no_wrap=True) table.add_column("Status", justify="left") table.add_column("Time", justify="left") metric_names: list[str] = [] for r in self._results: for k, v in r.metrics.items(): if k not in metric_names and isinstance(v, (int, float)): metric_names.append(k) for m in metric_names: table.add_column(m, justify="left") for label, result in zip(self._labels, self._results, strict=False): status = "[green]OK[/green]" if result.success else "[red]FAIL[/red]" time_str = f"{result.execution_time:.2f}s" row: list[str] = [label, status, time_str] for m in metric_names: if result.success: val = result.metrics.get(m) row.append(f"{val:.3f}" if isinstance(val, float) else (str(val) if val is not None else "—")) else: row.append("—") table.add_row(*row) con.print(table)
@property def summary_df(self): """ Return a ``pandas.DataFrame`` with one row per input. Columns: ``file``, ``success``, ``execution_time``, plus one column per scalar metric. Returns ``None`` when *pandas* is not installed. """ try: import pandas as pd except ImportError: return None rows = [] for label, result in zip(self._labels, self._results, strict=False): row: dict[str, Any] = { "file": label, "success": result.success, "execution_time": result.execution_time, } if result.success and result.metrics_df is not None: row.update(result.metrics_df.to_dict()) rows.append(row) return pd.DataFrame(rows)
[docs] class Pipeline: """ Pipeline for executing sequences of processors. A pipeline orchestrates the execution of multiple processors in sequence, handles errors, provides progress tracking, and supports parallelization. Example:: pipeline = Pipeline([ Loader("data.edf"), HighPassFilter(freq=1.0), UpSample(factor=10), TriggerDetector(regex=r"\\btrigger\\b"), AASCorrection(), EDFExporter("output.edf") ]) result = pipeline.run() if result.was_successful(): print(f"Completed in {result.execution_time:.2f}s") Attributes: processors: List of processors to execute name: Optional pipeline name """
[docs] def __init__(self, processors: list[Processor | Callable], name: str | None = None): """ Initialize pipeline. Plain callables (``Callable[[ProcessingContext], ProcessingContext]``) are automatically wrapped in a :class:`~facet.core.LambdaProcessor` so they can be used as inline steps without ceremony:: pipeline = Pipeline([ Loader("data.edf"), HighPassFilter(1.0), lambda ctx: (print(ctx.get_sfreq()) or ctx), AASCorrection(), ]) Args: processors: List of :class:`~facet.core.Processor` instances **or** plain callables to execute in order. name: Optional pipeline name (for logging) """ self.processors = self._normalise_processors(processors) self.name = name or "Pipeline"
@staticmethod def _normalise_processors( items: list[Processor | Callable], _index_offset: int = 0, ) -> list[Processor]: """ Coerce each item to a :class:`Processor`. Plain callables are wrapped in a :class:`~facet.core.LambdaProcessor`. Anything else that is not a :class:`Processor` raises :exc:`TypeError`. """ from .processor import LambdaProcessor result: list[Processor] = [] for i, p in enumerate(items): if isinstance(p, Processor): result.append(p) elif callable(p): display_name = getattr(p, "__name__", None) or f"step_{_index_offset + i}" result.append(LambdaProcessor(name=display_name, func=p)) else: raise TypeError( f"Item at index {_index_offset + i} must be a Processor instance or a callable, got {type(p)}" ) return result def _validate_pipeline(self) -> None: """No-op — validation now happens in _normalise_processors.""" # ---------------------------------------------------------------------- # # Execution helpers # # ---------------------------------------------------------------------- # def _group_processors( self, parallel: bool, channel_sequential: bool, ) -> list[tuple[list[Processor], str]]: """ Partition processors into execution groups. Returns a list of ``(processors, mode)`` tuples where *mode* is one of ``'channel_sequential'``, ``'parallel'``, or ``'serial'``. In channel_sequential mode consecutive processors with ``channel_wise = True`` (or ``run_once = True``) are merged into a single ``'channel_sequential'`` group. This grouping is entirely independent of ``parallel_safe``. """ groups: list[tuple[list[Processor], str]] = [] i = 0 while i < len(self.processors): proc = self.processors[i] ch_eligible = getattr(proc, "channel_wise", False) or getattr(proc, "run_once", False) if channel_sequential and ch_eligible: batch: list[Processor] = [] while i < len(self.processors): p = self.processors[i] if getattr(p, "channel_wise", False) or getattr(p, "run_once", False): batch.append(p) i += 1 else: break groups.append((batch, "channel_sequential")) elif parallel and proc.parallel_safe: groups.append(([proc], "parallel")) i += 1 else: groups.append(([proc], "serial")) i += 1 return groups def _dispatch_step( self, processors: list[Processor], mode: str, context: ProcessingContext, n_jobs: int, ) -> ProcessingContext: """Execute one group of processors according to *mode*.""" if mode == "channel_sequential": return ChannelSequentialExecutor().execute(processors, context) if mode == "parallel": return ParallelExecutor(n_jobs=n_jobs).execute(processors[0], context) return processors[0].execute(context) # ---------------------------------------------------------------------- # # Public API # # ---------------------------------------------------------------------- #
[docs] def run( self, initial_context: ProcessingContext | None = None, parallel: bool = False, n_jobs: int = -1, channel_sequential: bool = False, show_progress: bool = True, ) -> PipelineResult: """ Execute the pipeline. Args: initial_context: Initial context (if None, first processor creates it) parallel: Enable parallel execution for compatible processors n_jobs: Number of parallel jobs (-1 for all CPUs) channel_sequential: Run consecutive channel-wise processors (``channel_wise = True``) as a single per-channel pass. For each channel the full local sequence executes before the next channel starts:: for each channel: channel → HP-filter → UpSample → AAS → DownSample → store The output array is pre-allocated at the final sfreq so the full high-sfreq intermediate data never exists for all channels at once. Processors with ``run_once = True`` (e.g. TriggerAligner) are included in the per-channel pass but only execute for the first channel; all subsequent channels skip them. This flag is independent of ``parallel_safe`` and has no relation to multiprocessing. Takes precedence over *parallel* for eligible processors. show_progress: Show progress bar Returns: PipelineResult containing final context and metadata """ import time start_time = time.time() console = get_console() n_procs = len(self.processors) execution_mode = "channel_sequential" if channel_sequential else "parallel" if parallel else "serial" console.set_pipeline_metadata( { "execution_mode": execution_mode, "n_jobs": "1" if channel_sequential else str(n_jobs), } ) console.start_pipeline( self.name, n_procs, step_names=[p.name for p in self.processors], ) logger.info(f"Starting pipeline: {self.name} ({n_procs} processors)") context = initial_context current_processor: tuple[int, Processor] | None = None try: step_offset = 0 for processors, mode in self._group_processors(parallel, channel_sequential): current_processor = (step_offset, processors[0]) label = " → ".join(p.name for p in processors) logger.info(f"[{step_offset + 1}/{n_procs}] {label}") for k, p in enumerate(processors): console.step_started(step_offset + k, p.name) set_current_step_index(step_offset) step_start = time.time() try: context = self._dispatch_step(processors, mode, context, n_jobs) finally: set_current_step_index(None) duration = time.time() - step_start for k, p in enumerate(processors): console.step_completed( step_offset + k, p.name, duration / len(processors), metrics={ "execution_mode": mode, "last_duration": f"{duration:.2f}s", }, ) step_offset += len(processors) execution_time = time.time() - start_time logger.info(f"Pipeline completed in {execution_time:.2f}s") console.pipeline_complete(True, execution_time) return PipelineResult(context=context, success=True, execution_time=execution_time) except Exception as e: execution_time = time.time() - start_time if current_processor: failed_idx, failed_proc = current_processor logger.error( f"Pipeline failed after {execution_time:.2f}s during " f"{failed_proc.name} (step {failed_idx + 1}/{n_procs}): {e}" ) else: logger.error(f"Pipeline failed after {execution_time:.2f}s: {e}") logger.opt(exception=e).debug("Exception details") console.pipeline_failed( execution_time, e, current_processor[0] if current_processor else None, current_processor[1].name if current_processor else None, ) return PipelineResult( context=context, success=False, error=e, execution_time=execution_time, failed_processor=current_processor[1].name if current_processor else None, failed_processor_index=current_processor[0] if current_processor else None, )
[docs] def add(self, processor: Processor | Callable) -> "Pipeline": """ Add a processor or callable to the pipeline (fluent API). Args: processor: :class:`~facet.core.Processor` instance or callable. Returns: Self for chaining """ [normalised] = self._normalise_processors([processor], _index_offset=len(self.processors)) self.processors.append(normalised) return self
[docs] def extend(self, processors: list[Processor | Callable]) -> "Pipeline": """ Extend pipeline with multiple processors or callables. Args: processors: List of processors or callables to add. Returns: Self for chaining """ self.processors.extend(self._normalise_processors(processors, _index_offset=len(self.processors))) return self
[docs] def insert(self, index: int, processor: Processor | Callable) -> "Pipeline": """ Insert a processor or callable at a specific position. Args: index: Position to insert processor: :class:`~facet.core.Processor` instance or callable. Returns: Self for chaining """ [normalised] = self._normalise_processors([processor], _index_offset=index) self.processors.insert(index, normalised) return self
[docs] def remove(self, index: int) -> "Pipeline": """ Remove processor at index. Args: index: Index to remove Returns: Self for chaining """ self.processors.pop(index) return self
[docs] def validate_all(self, context: ProcessingContext) -> list[str]: """ Validate all processors against a context. Useful for checking if a pipeline can run before actually running it. Args: context: Context to validate against Returns: List of validation error messages (empty if all valid) """ errors = [] for i, processor in enumerate(self.processors): try: processor.validate(context) except Exception as e: errors.append(f"Processor {i} ({processor.name}): {str(e)}") return errors
[docs] def describe(self) -> str: """ Get human-readable pipeline description. Returns: Multi-line string describing pipeline """ lines = [f"Pipeline: {self.name}", "=" * 50] for i, processor in enumerate(self.processors): lines.append(f"{i + 1}. {processor.name} ({processor.__class__.__name__})") if hasattr(processor, "_parameters"): for key, value in processor._parameters.items(): lines.append(f" - {key}: {value}") return "\n".join(lines)
[docs] def to_dict(self) -> dict[str, Any]: """ Serialize pipeline to dictionary. Returns: Dictionary representation """ return { "name": self.name, "processors": [ { "class": proc.__class__.__name__, "name": proc.name, "parameters": proc._parameters if hasattr(proc, "_parameters") else {}, } for proc in self.processors ], }
[docs] def map( self, inputs: list[str | ProcessingContext], loader_factory: Callable[[str], "Processor"] | None = None, parallel: bool = False, n_jobs: int = -1, on_error: str = "continue", keep_raw: bool = True, ) -> "BatchResult": """ Run the pipeline on multiple inputs and return a result per input. Each input can be: - A ``ProcessingContext`` — passed directly as ``initial_context``. - A **file path string** — a fresh :class:`~facet.io.Loader` is created automatically for each path via *loader_factory*. .. note:: Do **not** add a :class:`~facet.io.Loader` processor to the pipeline when using ``map()``. Loading is handled outside the pipeline so that each file gets its own isolated loader instance. Args: inputs: List of file paths or ``ProcessingContext`` objects. loader_factory: ``Callable[[path], Processor]`` that creates a fresh loader for each path string. Defaults to ``lambda p: Loader(path=p, preload=True)``. parallel: Whether to pass ``parallel=True`` to each ``pipeline.run()``. n_jobs: Passed through to ``pipeline.run()``. on_error: ``"continue"`` (default) — log failures and keep going; ``"raise"`` — re-raise the first error encountered. keep_raw: If ``False``, the Raw data is released from each result after the pipeline run completes, keeping only metrics and history in memory. Set to ``False`` when processing many files and you only need summary statistics. Defaults to ``True``. Returns: :class:`~facet.core.pipeline.BatchResult` containing one :class:`~facet.core.pipeline.PipelineResult` per input, in the same order. It behaves like a plain list but also offers ``BatchResult.print_summary()`` and ``BatchResult.summary_df``. Example:: pipeline = Pipeline([ TriggerDetector(regex=r"\\b1\\b"), UpSample(factor=10), AASCorrection(window_size=30), DownSample(factor=10), SNRCalculator(), ]) results = pipeline.map( ["sub-01.edf", "sub-02.edf", "sub-03.edf"], keep_raw=False, ) results.print_summary() """ from ..io.loaders import Loader as _Loader if loader_factory is None: loader_factory = lambda p: _Loader(path=p, preload=True) # noqa: E731 for proc in self.processors: if isinstance(proc, _Loader): raise ValueError( "A Loader processor was found inside the pipeline passed to map(). " "map() handles loading automatically — remove the Loader from the " "pipeline and pass file paths directly to map()." ) results: list[PipelineResult] = [] labels: list[str] = [] for item in inputs: if isinstance(item, ProcessingContext): run_pipeline = self initial_ctx = item label = repr(item) else: label = str(item) initial_ctx = None loader = loader_factory(item) try: initial_ctx = loader.execute(None) except Exception as exc: logger.error(f"Loader failed for '{label}': {exc}") result = PipelineResult( context=None, success=False, error=exc, failed_processor=getattr(loader, "name", "loader"), ) results.append(result) labels.append(label) if on_error == "raise": raise continue run_pipeline = self logger.info(f"Pipeline.map: processing '{label}'") result = run_pipeline.run( initial_context=initial_ctx, parallel=parallel, n_jobs=n_jobs, ) if not keep_raw and result.success: result.release_raw() results.append(result) labels.append(label) if not result.success and on_error == "raise": raise result.error return BatchResult(results, labels=labels)
[docs] def __len__(self) -> int: """Get number of processors.""" return len(self.processors)
[docs] def __getitem__(self, index: int) -> Processor: """Get processor by index.""" return self.processors[index]
[docs] def __repr__(self) -> str: """String representation.""" return f"Pipeline(name='{self.name}', n_processors={len(self.processors)})"
[docs] class PipelineBuilder: """ Fluent builder for constructing pipelines. Example:: pipeline = (PipelineBuilder() .add(Loader("data.edf")) .highpass(1.0) .upsample(10) .detect_triggers(r"\\btrigger\\b") .aas_correction() .export_edf("output.edf") .build()) """
[docs] def __init__(self, name: str | None = None): """ Initialize builder. Args: name: Optional pipeline name """ self._processors: list[Processor] = [] self._name = name
[docs] def add(self, processor: Processor) -> "PipelineBuilder": """ Add custom processor. Args: processor: Processor to add Returns: Self for chaining """ self._processors.append(processor) return self
[docs] def add_if(self, condition: bool, processor: Processor) -> "PipelineBuilder": """ Add processor conditionally. Args: condition: Whether to add processor processor: Processor to add Returns: Self for chaining """ if condition: self._processors.append(processor) return self
[docs] def build(self) -> Pipeline: """ Build the pipeline. Returns: Constructed Pipeline instance """ return Pipeline(self._processors, name=self._name)
# Convenience methods can be added here for common processors # These will be populated as we implement specific processors