Skip to content

Context Managers

bind_pipeline

bind_pipeline(pr: 'PipelineReport')

Bind a :class:PipelineReport to the current context.

Makes nested helpers (e.g., :func:pipeline_step) discover the active pipeline implicitly via a context variable, avoiding the need to pass pr on every call. Safe across threads/async tasks.

Parameters:

Yields:

Examples:

>>> report = PipelineReport(...)
>>> with bind_pipeline(report):
...     # Inside this block, pipeline_step(None, ...) will use `report`
...     with pipeline_step(None, "discover", label="Discover inputs") as st:
...         st.note("Scanning directory")
Source code in src/pipeline_watcher/core.py
@contextmanager
def bind_pipeline(pr: "PipelineReport"):
    """Bind a :class:`PipelineReport` to the current context.

    Makes nested helpers (e.g., :func:`pipeline_step`) discover the active
    pipeline implicitly via a context variable, avoiding the need to pass
    ``pr`` on every call. Safe across threads/async tasks.

    Parameters
    ----------
    pr : PipelineReport
        The pipeline report to bind in this context.

    Yields
    ------
    PipelineReport
        The same report passed in, for convenience.

    Examples
    --------
    >>> report = PipelineReport(...)
    >>> with bind_pipeline(report): # doctest: +SKIP
    ...     # Inside this block, pipeline_step(None, ...) will use `report`
    ...     with pipeline_step(None, "discover", label="Discover inputs") as st:
    ...         st.note("Scanning directory")
    """
    token = _current_pipeline_report.set(pr)
    try:
        yield pr
    finally:
        _current_pipeline_report.reset(token)

pipeline_step

pipeline_step(pr: Optional[PipelineReport], label: str, *, id: str | None = None, **other_options)

Context manager for a step inside a PipelineReport, governed by WatcherSettings.

Pass any WatcherSettings fields as kwargs (e.g., raise_on_exception=True). They apply only within this context; otherwise current settings are used.

The associated PipelineReport is either passed explicitly via pr= or discovered from the current bind_pipeline() context.

Source code in src/pipeline_watcher/core.py
@contextmanager
def pipeline_step(
    pr: Optional[PipelineReport],
    label: str,
    *,
    id: str | None = None,
    **other_options,
):
    """
    Context manager for a step inside a PipelineReport, governed by WatcherSettings.

    Pass any WatcherSettings fields as kwargs (e.g., raise_on_exception=True).
    They apply only within this context; otherwise current settings are used.

    The associated PipelineReport is either passed explicitly via `pr=`
    or discovered from the current bind_pipeline() context.
    """
    # Filter only valid WatcherSettings keys
    settings_overrides = {k: v for k, v in other_options.items() if k in _SETTINGS_KEYS}

    # Bind to a pipeline if not explicitly provided
    pr = pr or _current_pipeline_report.get(None)
    if pr is None:
        raise RuntimeError(
            "pipeline_step requires a PipelineReport: pass `pr=` or call within `with bind_pipeline(pr):`"
        )

    # Apply overrides (if any) for just this block
    with use_settings(**settings_overrides) as settings:
        st = StepReport.begin(label, id=id)

        # Optional capture (settings-driven)
        stdout_buf = StringIO() if settings.capture_streams else None
        stderr_buf = StringIO() if settings.capture_streams else None
        warn_list: list[warnings.WarningMessage] | None = None
        encountered_exception = False

        with ExitStack() as stack:
            if settings.capture_warnings:
                warn_list = stack.enter_context(warnings.catch_warnings(record=True))
                warnings.simplefilter("default")

            if settings.capture_streams:
                if stdout_buf is not None:
                    stack.enter_context(redirect_stdout(stdout_buf))
                if stderr_buf is not None:
                    stack.enter_context(redirect_stderr(stderr_buf))

            try:
                # User code runs here
                yield st
                st.end() # fallback in case user forgets.
            except BaseException as e:
                encountered_exception = True

                exc_type_name = type(e).__name__

                # Record error + optional detail
                st.errors.append(f"{exc_type_name}: {e}")

                if settings.store_traceback:
                    tb = "".join(
                        traceback.format_exception(
                            type(e), e, e.__traceback__, limit=settings.traceback_limit
                        )
                    )
                    if tb:
                        st.metadata["traceback"] = tb

                # Status line surfaces exception *name* even if traceback is suppressed
                st.fail(f"Unhandled {exc_type_name} in pipeline step")

                if settings.should_raise(e):
                    raise

                if msg := settings.suppression_breadcrumb(e):
                    st.warnings.append(msg)

            finally:
                # Persist diagnostics
                if stdout_buf is not None:
                    st.metadata["stdout"] = stdout_buf.getvalue()
                if stderr_buf is not None:
                    st.metadata["stderr"] = stderr_buf.getvalue()
                if warn_list is not None:
                    st.metadata["warnings"] = [
                        f"{w.category.__name__}: {w.message}"  # type: ignore[attr-defined]
                        for w in warn_list
                    ]

                # Let the pipeline/file own finalization:
                # append_step(st) will call st.end(), enforce id uniqueness,
                # and update the FileReport's update time.
                pr.append_step(st)

                # Optional: mirror pipeline_file best-effort auto-save
                if encountered_exception and settings.save_on_exception:
                    try:
                        save_path = settings.exception_save_path_override or pr.output_path
                        if save_path:
                            pr.save(save_path)
                        else:
                            st.warnings.append("auto-save skipped: no output path configured")
                    except Exception as save_err:
                        st.warnings.append(f"save failed: {save_err!r}")

pipeline_file

pipeline_file(pr: Optional['PipelineReport'], path: Path | str, *, file_id: str | None = None, file_save_to: Path | str | None = None, pipeline_save_to: Path | str | None = None, metadata: dict | None = None, set_stage_on_enter: bool = False, banner_stage: str | None = None, banner_percent_on_exit: int | None = None, banner_message_on_exit: str | None = None, **other_options)

Context manager for processing a single file within a PipelineReport.

This block is responsible for: - Creating and finalizing a FileReport for the file, - Capturing warnings, stdout/stderr, and unhandled exceptions, - Appending the finalized FileReport to the PipelineReport, and - Persisting pipeline and/or file-level snapshots according to the configured save policy.

Parameters:

  • pr

    (PipelineReport or None) –

    The pipeline report to attach this file to. If None, the currently bound pipeline (via bind_pipeline) is used. If no pipeline is available, a RuntimeError is raised.

  • path

    (str or Path) –

    Filesystem path associated with this file (used for naming and diagnostics).

  • file_id

    (str, default: None ) –

    Stable identifier for the file, if available.

  • file_save_to

    (str or Path, default: None ) –

    If provided, a finalized snapshot of the FileReport is written atomically to this path on exit (success or exception). The snapshot is written only after diagnostics and exception handling are complete.

  • pipeline_save_to

    (str or Path, default: None ) –

    If provided, an additional atomic snapshot of the PipelineReport is written to this path on exit. This is an additive copy and does not replace the canonical pipeline output path.

  • metadata

    (dict, default: None ) –

    Initial metadata to attach to the FileReport.

  • set_stage_on_enter

    (bool, default: False ) –

    If True, update the pipeline progress stage when entering the block.

  • banner_stage

    (optional, default: None ) –

    Optional progress banner updates applied on exit.

  • banner_percent_on_exit

    (optional, default: None ) –

    Optional progress banner updates applied on exit.

  • banner_message_on_exit

    (optional, default: None ) –

    Optional progress banner updates applied on exit.

  • **other_options

    Per-block overrides for WatcherSettings (e.g., raise_on_exception=True).

Saving and Exception Semantics
  • The FileReport is always finalized (end()) before any persistence.
  • If pr.output_path is set, the PipelineReport is atomically autosaved on every exit (success or exception) after the FileReport is appended.
  • If pipeline_save_to is provided, an additional copy of the pipeline report is saved to that path.
  • If file_save_to is provided, a per-file snapshot is saved atomically.
  • If an exception occurs:
    • It is always recorded on the FileReport.
    • It is re-raised only if dictated by WatcherSettings.
    • An exception-specific pipeline save is performed only if exception_save_path_override is set to a distinct path.
Notes
  • All writes performed by this context manager are best-effort and atomic; save failures are recorded as warnings and do not mask the original error.
  • FileReport snapshots are intentionally saved only by this context manager to guarantee they reflect a fully finalized state.
Source code in src/pipeline_watcher/core.py
@contextmanager
def pipeline_file(
    pr: Optional["PipelineReport"],
    path: Path | str,
    *,
    file_id: str | None = None,
    file_save_to: Path | str | None = None,
    pipeline_save_to: Path | str | None = None,
    metadata: dict | None = None,
    set_stage_on_enter: bool = False,
    banner_stage: str | None = None,
    banner_percent_on_exit: int | None = None,
    banner_message_on_exit: str | None = None,
    **other_options,
):
    """
    Context manager for processing a single file within a PipelineReport.

    This block is responsible for:
      - Creating and finalizing a FileReport for the file,
      - Capturing warnings, stdout/stderr, and unhandled exceptions,
      - Appending the finalized FileReport to the PipelineReport, and
      - Persisting pipeline and/or file-level snapshots according to
        the configured save policy.

    Parameters
    ----------
    pr : PipelineReport or None
        The pipeline report to attach this file to. If None, the currently
        bound pipeline (via ``bind_pipeline``) is used. If no pipeline is
        available, a RuntimeError is raised.
    path : str or pathlib.Path
        Filesystem path associated with this file (used for naming and
        diagnostics).
    file_id : str, optional
        Stable identifier for the file, if available.
    file_save_to : str or pathlib.Path, optional
        If provided, a finalized snapshot of the *FileReport* is written
        atomically to this path on exit (success or exception). The snapshot
        is written only after diagnostics and exception handling are complete.
    pipeline_save_to : str or pathlib.Path, optional
        If provided, an additional atomic snapshot of the *PipelineReport*
        is written to this path on exit. This is an additive copy and does
        not replace the canonical pipeline output path.
    metadata : dict, optional
        Initial metadata to attach to the FileReport.
    set_stage_on_enter : bool, optional
        If True, update the pipeline progress stage when entering the block.
    banner_stage, banner_percent_on_exit, banner_message_on_exit : optional
        Optional progress banner updates applied on exit.
    **other_options
        Per-block overrides for WatcherSettings (e.g., ``raise_on_exception=True``).

    Saving and Exception Semantics
    ------------------------------
    - The FileReport is always finalized (``end()``) before any persistence.
    - If ``pr.output_path`` is set, the PipelineReport is atomically autosaved
      on *every exit* (success or exception) after the FileReport is appended.
    - If ``pipeline_save_to`` is provided, an additional copy of the pipeline
      report is saved to that path.
    - If ``file_save_to`` is provided, a per-file snapshot is saved atomically.
    - If an exception occurs:
        * It is always recorded on the FileReport.
        * It is re-raised only if dictated by WatcherSettings.
        * An exception-specific pipeline save is performed only if
          ``exception_save_path_override`` is set to a distinct path.

    Notes
    -----
    - All writes performed by this context manager are best-effort and atomic;
      save failures are recorded as warnings and do not mask the original error.
    - FileReport snapshots are intentionally saved only by this context manager
      to guarantee they reflect a fully finalized state.
    """
    settings_overrides = {k: v for k, v in other_options.items() if k in _SETTINGS_KEYS}

    # Bind to a pipeline if not explicitly provided
    pr = pr or _current_pipeline_report.get(None)
    if pr is None:
        raise RuntimeError(
            "pipeline_file requires a PipelineReport: pass `pr=` or call within `with bind_pipeline(pr):`"
        )

    if not isinstance(path, (str, os.PathLike)):
        raise ValueError(f"path must be str or os.PathLike, not {type(path)!r}")

    # Apply settings overrides (if any) only for this block
    with use_settings(**settings_overrides) as settings:
        fr = FileReport.begin(
            path=Path(path),
            file_id=file_id,
            metadata=dict(metadata) if metadata else {},
        )
        fr._pipeline = pr

        # Optional banner update on enter
        if set_stage_on_enter:
            pr.set_progress(
                stage=banner_stage or fr.name,   # name always present from path
                percent=pr.percent,
                message=pr.message,
            )

        # Optional capture (settings-driven)
        stdout_buf = StringIO() if settings.capture_streams else None
        stderr_buf = StringIO() if settings.capture_streams else None
        warn_list: list[warnings.WarningMessage] | None = None

        with ExitStack() as stack:
            if settings.capture_warnings:
                warn_list = stack.enter_context(warnings.catch_warnings(record=True))
                warnings.simplefilter("default")
            if settings.capture_streams:
                if stdout_buf:
                    stack.enter_context(redirect_stdout(stdout_buf))
                if stderr_buf:
                    stack.enter_context(redirect_stderr(stderr_buf))

            encountered_exception = False
            try:
                yield fr
                fr.end() # not strictly necessary, but end is idempotent, and user may forget.
            except BaseException as e:
                encountered_exception = True

                is_supp = settings.is_suppressed(e)
                fr.errors.append(exception_summary(e))

                if settings.store_traceback:
                    tb = "".join(
                        traceback.format_exception(type(e), e, e.__traceback__, limit=settings.traceback_limit)
                    )
                    if tb:
                        fr.metadata["traceback"] = tb

                fr.fail(
                    "Handled exception (suppressed)" if is_supp else "Unhandled exception while processing file"
                )

                if settings.should_raise(e):
                    raise
                if (msg := settings.suppression_breadcrumb(e)):
                    fr.warnings.append(msg)

            finally:
                # Persist diagnostics
                if stdout_buf is not None:
                    fr.metadata["stdout"] = stdout_buf.getvalue()
                if stderr_buf is not None:
                    fr.metadata["stderr"] = stderr_buf.getvalue()
                if warn_list is not None:
                    fr.metadata["warnings"] = [
                        f"{w.category.__name__}: {w.message}"  # type: ignore[attr-defined]
                        for w in warn_list
                    ]
                fr.end()

                # after fr.end()
                if file_save_to is not None:
                    try:
                        atomic_write_json(
                            Path(file_save_to),
                            fr.model_dump(mode="json"),
                            indent=2,
                            encoding="utf-8",
                        )
                    except Exception as save_err:
                        fr.warnings.append(f"file report save failed: {save_err!r}")

                # Append to pipeline and (optionally) update banner on exit
                try:
                    pr.append_file(fr)
                    if (banner_stage is not None
                        or banner_percent_on_exit is not None
                        or banner_message_on_exit is not None):
                        pr.set_progress(
                            stage=banner_stage or pr.stage,
                            percent=pr.percent if banner_percent_on_exit is None else banner_percent_on_exit,
                            message=banner_message_on_exit or pr.message,
                        )
                finally:

                    # Canonical autosave
                    if pr.output_path is not None:
                        try:
                            pr.save(pr.output_path)
                        except Exception as save_err:
                            fr.warnings.append(f"pipeline autosave failed: {save_err!r}")

                    # Additional copy (Option B)
                    if pipeline_save_to is not None:
                        try:
                            pr.save(pipeline_save_to)
                        except Exception as save_err:
                            fr.warnings.append(f"pipeline save_to failed: {save_err!r}")

                    # Best-effort save-on-exception
                    if (
                        encountered_exception
                        and settings.save_on_exception
                        and settings.exception_save_path_override is not None
                        and (pr.output_path is None or Path(settings.exception_save_path_override) != Path(pr.output_path))
                    ):
                        try:
                            pr.save(settings.exception_save_path_override)
                        except Exception as save_err:
                            fr.warnings.append(f"save failed: {save_err!r}")

file_step

file_step(file_report: FileReport, label: str, *, id: str | None = None, step_save_to: Path | str | None = None, pipeline_save_to: Path | str | None = None, **other_options)

Context manager for a step inside a FileReport, governed by WatcherSettings.

Pass any WatcherSettings fields as kwargs (e.g., raise_on_exception=True). They apply only within this context; otherwise current settings are used.

Source code in src/pipeline_watcher/core.py
@contextmanager
def file_step(
    file_report: FileReport,
    label: str,
    *,
    id: str | None = None,
    step_save_to: Path | str | None = None,
    pipeline_save_to: Path | str | None = None,
    **other_options,
):
    """
    Context manager for a step inside a FileReport, governed by WatcherSettings.

    Pass any WatcherSettings fields as kwargs (e.g., raise_on_exception=True).
    They apply only within this context; otherwise current settings are used.
    """
    settings_overrides = {
        k: v for k, v in other_options.items() if k in _SETTINGS_KEYS
    }

    with use_settings(**settings_overrides) as settings:
        st = StepReport.begin(label, id=id)

        stdout_buf = StringIO() if settings.capture_streams else None
        stderr_buf = StringIO() if settings.capture_streams else None
        warn_list: list[warnings.WarningMessage] | None = None
        encountered_exception = False

        with ExitStack() as stack:
            if settings.capture_warnings:
                warn_list = stack.enter_context(
                    warnings.catch_warnings(record=True)
                )
                warnings.simplefilter("default")

            if settings.capture_streams:
                if stdout_buf is not None:
                    stack.enter_context(redirect_stdout(stdout_buf))
                if stderr_buf is not None:
                    stack.enter_context(redirect_stderr(stderr_buf))

            try:
                yield st

            except BaseException as e:
                encountered_exception = True

                is_supp = settings.is_suppressed(e)
                st.errors.append(exception_summary(e))  # same helper used by pipeline_file

                if settings.store_traceback:
                    tb = "".join(
                        traceback.format_exception(
                            type(e), e, e.__traceback__, limit=settings.traceback_limit
                        )
                    )
                    if tb:
                        st.metadata["traceback"] = tb

                st.fail(
                    "Handled exception (suppressed)"
                    if is_supp
                    else "Unhandled exception while running file step"
                )

                if settings.should_raise(e):
                    raise

                if (msg := settings.suppression_breadcrumb(e)):
                    st.warnings.append(msg)

            finally:
                if stdout_buf is not None:
                    st.metadata["stdout"] = stdout_buf.getvalue()
                if stderr_buf is not None:
                    st.metadata["stderr"] = stderr_buf.getvalue()
                if warn_list is not None:
                    st.metadata["warnings"] = [
                        f"{w.category.__name__}: {w.message}"
                        for w in warn_list
                    ]

                # Finalize step via FileReport: append_step calls st.end(),
                # enforces id uniqueness, updates timestamps, etc.
                file_report.append_step(st)

                # Step snapshot AFTER append_step finalization
                if step_save_to is not None:
                    try:
                        atomic_write_json(
                            Path(step_save_to),
                            st.model_dump(mode="json"),
                            indent=2,
                            encoding="utf-8",
                        )
                    except Exception as save_err:
                        st.warnings.append(f"step report save failed: {save_err!r}")

                # Additional copy (Option B)
                if file_report._pipeline is not None and pipeline_save_to is not None:
                    try:
                        file_report._pipeline.save(pipeline_save_to)
                    except Exception as save_err:
                        st.warnings.append(f"pipeline save_to failed: {save_err!r}")

                # Canonical autosave (every exit)
                if file_report._pipeline is not None and file_report._pipeline.output_path is not None:
                    try:
                        file_report._pipeline.save(file_report._pipeline.output_path)
                    except Exception as save_err:
                        st.warnings.append(f"pipeline autosave failed: {save_err!r}")

                # Best-effort save-on-exception to override path only (distinct)
                if (
                    encountered_exception
                    and settings.save_on_exception
                    and file_report._pipeline is not None
                    and settings.exception_save_path_override is not None
                    and (file_report._pipeline.output_path is None
                         or Path(settings.exception_save_path_override)\
                         != Path(file_report._pipeline.output_path))
                ):
                    try:
                        file_report._pipeline.save(settings.exception_save_path_override)
                    except Exception as save_err:
                        st.warnings.append(f"save-on-exception failed: {save_err!r}")