Skip to content

Context Managers

bind_pipeline

bind_pipeline(pr: 'PipelineReport')

Bind a :class:PipelineReport to the current context.

Lets nested helpers (e.g., :func:pipeline_step, :func:pipeline_file) discover the active pipeline via a thread/async-safe context variable, so you don’t have to pass pr explicitly.

Parameters:

  • pr

    (PipelineReport) –

    The pipeline to bind for the duration of the context.

Yields:

Examples:

>>> report = PipelineReport(...)
... with bind_pipeline(report):
...     # inside, helpers can omit `pr`
...     ...
Source code in src/pipeline_watcher/core.py
@contextmanager
def bind_pipeline(pr: "PipelineReport"):
    """Bind a :class:`PipelineReport` to the current context.

    Lets nested helpers (e.g., :func:`pipeline_step`, :func:`pipeline_file`)
    discover the active pipeline via a thread/async-safe context variable,
    so you don’t have to pass ``pr`` explicitly.

    Parameters
    ----------
    pr : PipelineReport
        The pipeline to bind for the duration of the context.

    Yields
    ------
    PipelineReport
        The same report object (convenience).

    Examples
    --------
    >>> report = PipelineReport(...)
    ... with bind_pipeline(report):
    ...     # inside, helpers can omit `pr`
    ...     ...
    """
    token = _current_pipeline_report.set(pr)
    try:
        yield pr
    finally:
        _current_pipeline_report.reset(token)

pipeline_step

pipeline_step(pr: Optional[PipelineReport], label: str, *, id: str | None = None, **other_options)

Context manager for a step inside a PipelineReport, governed by WatcherSettings.

Pass any WatcherSettings fields as kwargs (e.g., raise_on_exception=True). They apply only within this context; otherwise current settings are used.

The associated PipelineReport is either passed explicitly via pr= or discovered from the current bind_pipeline() context.

Source code in src/pipeline_watcher/core.py
@contextmanager
def pipeline_step(
    pr: Optional[PipelineReport],
    label: str,
    *,
    id: str | None = None,
    **other_options,
):
    """
    Context manager for a step inside a PipelineReport, governed by WatcherSettings.

    Pass any WatcherSettings fields as kwargs (e.g., raise_on_exception=True).
    They apply only within this context; otherwise current settings are used.

    The associated PipelineReport is either passed explicitly via `pr=`
    or discovered from the current bind_pipeline() context.
    """
    # Filter only valid WatcherSettings keys
    settings_overrides = {k: v for k, v in other_options.items() if k in _SETTINGS_KEYS}

    # Bind to a pipeline if not explicitly provided
    pr = pr or _current_pipeline_report.get(None)
    if pr is None:
        raise RuntimeError(
            "pipeline_step requires a PipelineReport: pass `pr=` or call within `with bind_pipeline(pr):`"
        )

    # Apply overrides (if any) for just this block
    with use_settings(**settings_overrides) as settings:
        st = StepReport.begin(label, id=id)

        # Optional capture (settings-driven)
        stdout_buf = StringIO() if settings.capture_streams else None
        stderr_buf = StringIO() if settings.capture_streams else None
        warn_list: list[warnings.WarningMessage] | None = None
        encountered_exception = False

        with ExitStack() as stack:
            if settings.capture_warnings:
                warn_list = stack.enter_context(warnings.catch_warnings(record=True))
                warnings.simplefilter("default")

            if settings.capture_streams:
                if stdout_buf is not None:
                    stack.enter_context(redirect_stdout(stdout_buf))
                if stderr_buf is not None:
                    stack.enter_context(redirect_stderr(stderr_buf))

            try:
                # User code runs here
                yield st
                st.end() # fallback in case user forgets.
            except BaseException as e:
                encountered_exception = True

                exc_type_name = type(e).__name__

                # Record error + optional detail
                st.errors.append(f"{exc_type_name}: {e}")

                if settings.store_traceback:
                    tb = "".join(
                        traceback.format_exception(
                            type(e), e, e.__traceback__, limit=settings.traceback_limit
                        )
                    )
                    if tb:
                        st.metadata["traceback"] = tb

                # Status line surfaces exception *name* even if traceback is suppressed
                st.fail(f"Unhandled {exc_type_name} in pipeline step")

                if settings.should_raise(e):
                    raise

                if msg := settings.suppression_breadcrumb(e):
                    st.warnings.append(msg)

            finally:
                # Persist diagnostics
                if stdout_buf is not None:
                    st.metadata["stdout"] = stdout_buf.getvalue()
                if stderr_buf is not None:
                    st.metadata["stderr"] = stderr_buf.getvalue()
                if warn_list is not None:
                    st.metadata["warnings"] = [
                        f"{w.category.__name__}: {w.message}"  # type: ignore[attr-defined]
                        for w in warn_list
                    ]

                # Let the pipeline/file own finalization:
                # append_step(st) will call st.end(), enforce id uniqueness,
                # and update the FileReport's update time.
                pr.append_step(st)

                # Optional: mirror pipeline_file best-effort auto-save
                if encountered_exception and settings.save_on_exception:
                    try:
                        save_path = settings.exception_save_path_override or pr.output_path
                        if save_path:
                            pr.save(save_path)
                        else:
                            st.warnings.append("auto-save skipped: no output path configured")
                    except Exception as save_err:
                        st.warnings.append(f"save failed: {save_err!r}")

pipeline_file

pipeline_file(pr: Optional['PipelineReport'], path: Path | str, *, file_id: str | None = None, metadata: dict | None = None, set_stage_on_enter: bool = False, banner_stage: str | None = None, banner_percent_on_exit: int | None = None, banner_message_on_exit: str | None = None, **other_options)

Per-file processing block using WatcherSettings as the source of truth.

Pass any WatcherSettings fields as kwargs (e.g., raise_on_exception=True) and they will apply only within this context; otherwise the current context settings are used.

Source code in src/pipeline_watcher/core.py
@contextmanager
def pipeline_file(
    pr: Optional["PipelineReport"],
    path: Path | str,
    *,
    file_id: str | None = None,
    metadata: dict | None = None,
    set_stage_on_enter: bool = False,
    banner_stage: str | None = None,
    banner_percent_on_exit: int | None = None,
    banner_message_on_exit: str | None = None,
    **other_options,
):
    """
    Per-file processing block using WatcherSettings as the source of truth.

    Pass any WatcherSettings fields as kwargs (e.g., raise_on_exception=True)
    and they will apply only within this context; otherwise the current
    context settings are used.
    """
    settings_overrides = {k: v for k, v in other_options.items() if k in _SETTINGS_KEYS}

    # Bind to a pipeline if not explicitly provided
    pr = pr or _current_pipeline_report.get(None)
    if pr is None:
        raise RuntimeError(
            "pipeline_file requires a PipelineReport: pass `pr=` or call within `with bind_pipeline(pr):`"
        )

    if not isinstance(path, (str, os.PathLike)):
        raise ValueError(f"path must be str or os.PathLike, not {type(path)!r}")

    # Apply settings overrides (if any) only for this block
    with use_settings(**settings_overrides) as settings:
        fr = FileReport.begin(
            path=Path(path),
            file_id=file_id,
            metadata=dict(metadata) if metadata else {},
        )
        fr._pipeline = pr

        # Optional banner update on enter
        if set_stage_on_enter:
            pr.set_progress(
                stage=banner_stage or fr.name,   # name always present from path
                percent=pr.percent,
                message=pr.message,
            )

        # Optional capture (settings-driven)
        stdout_buf = StringIO() if settings.capture_streams else None
        stderr_buf = StringIO() if settings.capture_streams else None
        warn_list: list[warnings.WarningMessage] | None = None

        with ExitStack() as stack:
            if settings.capture_warnings:
                warn_list = stack.enter_context(warnings.catch_warnings(record=True))
                warnings.simplefilter("default")
            if settings.capture_streams:
                if stdout_buf:
                    stack.enter_context(redirect_stdout(stdout_buf))
                if stderr_buf:
                    stack.enter_context(redirect_stderr(stderr_buf))

            encountered_exception = False
            try:
                yield fr
                fr.end() # not strictly necessary, but end is idempotent, and user may forget.
            except BaseException as e:
                # --- Always record first ---
                encountered_exception = True
                fr.errors.append(f"{type(e).__name__}: {e}")
                if settings.store_traceback:
                    tb = "".join(
                        traceback.format_exception(type(e), e, e.__traceback__, limit=settings.traceback_limit)
                    )
                    if tb:
                        fr.metadata["traceback"] = tb
                fr.fail("Unhandled exception while processing file")

                # --- Decide raising via settings helpers ---
                if settings.should_raise(e):
                    raise
                if (msg := settings.suppression_breadcrumb(e)):
                    fr.warnings.append(msg)
            finally:
                # Persist diagnostics
                if stdout_buf is not None:
                    fr.metadata["stdout"] = stdout_buf.getvalue()
                if stderr_buf is not None:
                    fr.metadata["stderr"] = stderr_buf.getvalue()
                if warn_list is not None:
                    fr.metadata["warnings"] = [
                        f"{w.category.__name__}: {w.message}"  # type: ignore[attr-defined]
                        for w in warn_list
                    ]
                fr.end()

                # Append to pipeline and (optionally) update banner on exit
                try:
                    pr.append_file(fr)
                    if (banner_stage is not None
                        or banner_percent_on_exit is not None
                        or banner_message_on_exit is not None):
                        pr.set_progress(
                            stage=banner_stage or pr.stage,
                            percent=pr.percent if banner_percent_on_exit is None else banner_percent_on_exit,
                            message=banner_message_on_exit or pr.message,
                        )
                finally:
                    # Best-effort save-on-exception
                    if encountered_exception and settings.save_on_exception:
                        try:
                            save_path = settings.exception_save_path_override or pr.output_path
                            if save_path:
                                pr.save(save_path)
                            else:
                                fr.warnings.append("auto-save skipped: no output path configured")
                        except Exception as save_err:
                            fr.warnings.append(f"save failed: {save_err!r}")

file_step

file_step(file_report: FileReport, label: str, *, id: str | None = None, **other_options)

Context manager for a step inside a FileReport, governed by WatcherSettings.

Pass any WatcherSettings fields as kwargs (e.g., raise_on_exception=True). They apply only within this context; otherwise current settings are used.

Source code in src/pipeline_watcher/core.py
@contextmanager
def file_step(
    file_report: FileReport,
    label: str,
    *,
    id: str | None = None,
    **other_options,
):
    """
    Context manager for a step inside a FileReport, governed by WatcherSettings.

    Pass any WatcherSettings fields as kwargs (e.g., raise_on_exception=True).
    They apply only within this context; otherwise current settings are used.
    """
    settings_overrides = {
        k: v for k, v in other_options.items() if k in _SETTINGS_KEYS
    }

    with use_settings(**settings_overrides) as settings:
        st = StepReport.begin(label, id=id)

        stdout_buf = StringIO() if settings.capture_streams else None
        stderr_buf = StringIO() if settings.capture_streams else None
        warn_list: list[warnings.WarningMessage] | None = None
        encountered_exception = False

        with ExitStack() as stack:
            if settings.capture_warnings:
                warn_list = stack.enter_context(
                    warnings.catch_warnings(record=True)
                )
                warnings.simplefilter("default")

            if settings.capture_streams:
                if stdout_buf is not None:
                    stack.enter_context(redirect_stdout(stdout_buf))
                if stderr_buf is not None:
                    stack.enter_context(redirect_stderr(stderr_buf))

            try:
                yield st
                st.end() # fallback in case user forgets.

            except BaseException as e:
                encountered_exception = True
                exc_type_name = type(e).__name__

                st.errors.append(f"{exc_type_name}: {e}")

                if settings.store_traceback:
                    tb = "".join(
                        traceback.format_exception(
                            type(e), e, e.__traceback__,
                            limit=settings.traceback_limit,
                        )
                    )
                    if tb:
                        st.metadata["traceback"] = tb

                st.fail(f"Unhandled {exc_type_name} in file step")

                if settings.should_raise(e):
                    raise

                if msg := settings.suppression_breadcrumb(e):
                    st.warnings.append(msg)

            finally:
                if stdout_buf is not None:
                    st.metadata["stdout"] = stdout_buf.getvalue()
                if stderr_buf is not None:
                    st.metadata["stderr"] = stderr_buf.getvalue()
                if warn_list is not None:
                    st.metadata["warnings"] = [
                        f"{w.category.__name__}: {w.message}"
                        for w in warn_list
                    ]

                # Finalize step via FileReport: append_step calls st.end(),
                # enforces id uniqueness, updates timestamps, etc.
                file_report.append_step(st)

                # Auto-save via owning pipeline, if available
                if encountered_exception and settings.save_on_exception:
                    pipeline = getattr(file_report, "pipeline", None)
                    if pipeline is not None:
                        try:
                            save_path = (
                                settings.exception_save_path_override
                                or pipeline.output_path
                            )
                            if save_path:
                                pipeline.save(save_path)
                            else:
                                st.warnings.append(
                                    "auto-save skipped: no pipeline output path configured"
                                )
                        except Exception as save_err:
                            st.warnings.append(f"save failed: {save_err!r}")
                    else:
                        # Invariant *should* be: file_report is always tied to a pipeline.
                        # But this keeps the behavior graceful if that’s ever relaxed.
                        st.warnings.append(
                            "auto-save skipped: file_report not attached to a pipeline"
                        )