Skip to content

File & Step

ReportBase

Bases: BaseModel, ABC, ReviewHelpers

Abstract base for report units (steps/files/batches).

Note:

Requires implementation of ok method in order to be instantiated.

Provides a common lifecycle (start → {succeed|fail|skip} → end), structured messaging (notes, warnings, errors), metadata, and HITL review flags. Subclasses must define :attr:ok to indicate success when :meth:end infers a terminal status.

Attributes:

  • status (Status) –

    Current lifecycle status (defaults to PENDING).

  • percent (int) –

    Progress percentage 0..100 (informational; not enforced).

  • started_at (datetime | None) –

    UTC timestamp when processing started.

  • finished_at (datetime | None) –

    UTC timestamp when processing finalized.

  • notes (list[str]) –

    Freeform narrative messages intended for UI display.

  • errors (list[str]) –

    Fatal issues; presence typically implies failure.

  • warnings (list[str]) –

    Non-fatal issues worth surfacing to users.

  • metadata (dict[str, Any]) –

    Arbitrary structured context for search/analytics/UI.

  • review (ReviewFlag) –

    Human-in-the-loop flag (flagged + optional reason).

  • report_version (str) –

    Schema version written to JSON artifacts.

  • defer_start (bool) –

    Initialization flag to skip .begin call upon construction.

  • duration_ms (float | None) –

    Elapsed time in milliseconds (property).

  • failed (bool) –

    Return True if the unit has failed (property).

  • pending (bool) –

    Return True if the unit is pending (property).

  • running (bool) –

    Return True if the unit is running (property).

  • skipped (bool) –

    Return True if the unit was skipped (property).

  • succeeded (bool) –

    Return True if the unit has succeeded (property).

See Also

StepReport Unit of work inside a file/batch. FileReport Ordered collection of steps for a single file.

Methods:

  • clear_review

    Clear the HITL review flag.

  • end

    Finalize the unit if not already terminal.

  • error

    Append an error message (does not change status).

  • fail

    Finalize as failed (FAILED).

  • model_post_init
  • note

    Append a user-facing note.

  • request_review

    Set the HITL review flag.

  • skip

    Finalize as skipped (SKIPPED).

  • start

    Mark the unit as running and stamp started_at if missing.

  • succeed

    Finalize successfully (SUCCEEDED) and set percent=100.

  • warn

    Append a non-fatal warning.

defer_start class-attribute instance-attribute

defer_start: bool = Field(default=False, exclude=True, repr=False)

duration_ms property

duration_ms: float | None

Elapsed time in milliseconds.

Returns None if timing cannot be determined (e.g., no started_at). Uses finished_at when present; otherwise uses 'now' to reflect in-flight duration. Clamped at >= 0 and rounded to 3 decimals.

errors class-attribute instance-attribute

errors: List[str] = Field(default_factory=list)

failed property

failed: bool

Return True if the unit has failed.

finished_at class-attribute instance-attribute

finished_at: Optional[datetime] = None

metadata class-attribute instance-attribute

metadata: Dict[str, Any] = Field(default_factory=dict)

notes class-attribute instance-attribute

notes: List[str] = Field(default_factory=list)

ok abstractmethod property

ok: bool

Truthiness of success when inferring status.

Subclasses define the success condition used by :meth:end when the unit is not already in a terminal status. Typical implementations derive this from fields like errors, checks (for steps), or child statuses (for files/batches).

Returns:

  • bool

    True if the unit should be considered successful.

pending property

pending

Return True if the unit is pending.

percent class-attribute instance-attribute

percent: int = 0

report_version class-attribute instance-attribute

report_version: str = SCHEMA_VERSION

requires_human_review property

requires_human_review: bool

Whether the unit is flagged for human review.

Returns:

  • bool

    True if :attr:review.flagged is set, else False.

review class-attribute instance-attribute

review: ReviewFlag = Field(default_factory=ReviewFlag)

running property

running

Return True if the unit is running.

skipped property

skipped

Return True if the unit was skipped.

started_at class-attribute instance-attribute

started_at: Optional[datetime] = None

status class-attribute instance-attribute

status: Status = PENDING

succeeded property

succeeded

Return True if the unit has succeeded.

terminal property

terminal

Return True if the unit is terminal.

warnings class-attribute instance-attribute

warnings: List[str] = Field(default_factory=list)

clear_review

clear_review() -> HasReviewTV

Clear the HITL review flag.

Returns:

Source code in src/pipeline_watcher/core.py
def clear_review(self: HasReviewTV) -> HasReviewTV:
    """Clear the HITL review flag.

    Returns
    -------
    ReportBase
        Self.
    """
    self.review = ReviewFlag()
    return self

end

end() -> 'ReportBase'

Finalize the unit if not already terminal.

If already terminal (SUCCESS, FAILED, SKIPPED), this stamps :attr:finished_at if missing and returns. Otherwise, infers success from :attr:ok (roll-up of step outcomes) and calls :meth:succeed or :meth:fail accordingly.

Returns:

Source code in src/pipeline_watcher/core.py
def end(self) -> "ReportBase":
    """Finalize the unit if not already terminal.

    If already terminal (``SUCCESS``, ``FAILED``, ``SKIPPED``), this stamps
    :attr:`finished_at` if missing and returns. Otherwise, infers success
    from :attr:`ok` (roll-up of step outcomes) and calls :meth:`succeed`
    or :meth:`fail` accordingly.

    Returns
    -------
    ReportBase
        Self.
    """
    if self.status.terminal:
        if not self.finished_at:
            self.finished_at = _now()
        return self
    return self.succeed() if self.ok else self.fail("One or more file steps failed")

error

error(msg: str) -> 'ReportBase'

Append an error message (does not change status).

Note
Fail is the preferred method. error does not change status for design consistency.

Parameters:

  • msg

    (str) –

    Message to append to :attr:errors.

Returns:

Notes

Use :meth:fail to change terminal status to FAILED. This method only records text.

Source code in src/pipeline_watcher/core.py
def error(self, msg: str) -> "ReportBase":
    """Append an error message (does not change status).

    Note
    ----
        Fail is the preferred method. error does not change status for design consistency.

    Parameters
    ----------
    msg : str
        Message to append to :attr:`errors`.

    Returns
    -------
    ReportBase
        Self.

    Notes
    -----
    Use :meth:`fail` to change terminal status to ``FAILED``. This method
    only records text.
    """
    self.errors.append(msg)
    return self

fail

fail(message: Optional[str] = None) -> 'ReportBase'

Finalize as failed (FAILED).

Parameters:

  • message

    (str, default: None ) –

    Error text to append to :attr:errors.

Returns:

Source code in src/pipeline_watcher/core.py
def fail(self, message: Optional[str] = None) -> "ReportBase":
    """Finalize as failed (``FAILED``).

    Parameters
    ----------
    message : str, optional
        Error text to append to :attr:`errors`.

    Returns
    -------
    ReportBase
        Self.
    """
    self.status = Status.FAILED
    if message:
        self.error(message)
    self.finished_at = _now()
    return self

model_post_init

model_post_init(__context) -> None
Source code in src/pipeline_watcher/core.py
def model_post_init(self, __context) -> None:
    # auto-start unless caller explicitly defers
    if not self.defer_start and self.pending:
        self.start()

note

note(msg: str) -> 'ReportBase'

Append a user-facing note.

Parameters:

  • msg

    (str) –

    Message to append to :attr:notes.

Returns:

Source code in src/pipeline_watcher/core.py
def note(self, msg: str) -> "ReportBase":
    """Append a user-facing note.

    Parameters
    ----------
    msg : str
        Message to append to :attr:`notes`.

    Returns
    -------
    ReportBase
        Self.
    """
    self.notes.append(msg)
    return self

request_review

request_review(reason: str | None = None) -> HasReviewTV

Set the HITL review flag.

Parameters:

  • reason

    (str, default: None ) –

    Short UI-visible reason for requesting review.

Returns:

Source code in src/pipeline_watcher/core.py
def request_review(self: HasReviewTV, reason: str | None = None) -> HasReviewTV:
    """Set the HITL review flag.

    Parameters
    ----------
    reason : str, optional
        Short UI-visible reason for requesting review.

    Returns
    -------
    ReportBase
        Self.
    """
    self.review = ReviewFlag(flagged=True, reason=reason)
    return self

skip

skip(reason: Optional[str] = None) -> 'ReportBase'

Finalize as skipped (SKIPPED).

Parameters:

  • reason

    (str, default: None ) –

    Rationale appended to :attr:notes as "Skipped: {reason}".

Returns:

Source code in src/pipeline_watcher/core.py
def skip(self, reason: Optional[str] = None) -> "ReportBase":
    """Finalize as skipped (``SKIPPED``).

    Parameters
    ----------
    reason : str, optional
        Rationale appended to :attr:`notes` as ``"Skipped: {reason}"``.

    Returns
    -------
    ReportBase
        Self.
    """
    self.status = Status.SKIPPED
    if reason:
        self.notes.append(f"Skipped: {reason}")
    self.finished_at = _now()
    return self

start

start() -> 'ReportBase'

Mark the unit as running and stamp started_at if missing.

Returns:

Examples:

>>> step = StepReport(...)
>>> step.start().note("begin").percent == 0
True
Source code in src/pipeline_watcher/core.py
def start(self) -> "ReportBase":
    """Mark the unit as running and stamp ``started_at`` if missing.

    Returns
    -------
    ReportBase
        Self (for fluent chaining).

    Examples
    --------
    >>> step = StepReport(...)
    >>> step.start().note("begin").percent == 0
    True
    """
    self.status = Status.RUNNING
    if not self.started_at:
        self.started_at = _now()
    return self

succeed

succeed() -> 'ReportBase'

Finalize successfully (SUCCEEDED) and set percent=100.

Also stamps finished_at to the current time.

Returns:

Source code in src/pipeline_watcher/core.py
def succeed(self) -> "ReportBase":
    """Finalize successfully (``SUCCEEDED``) and set ``percent=100``.

    Also stamps ``finished_at`` to the current time.

    Returns
    -------
    ReportBase
        Self.
    """
    self.status = Status.SUCCEEDED
    self.percent = 100
    self.finished_at = _now()
    return self

warn

warn(msg: str) -> 'ReportBase'

Append a non-fatal warning.

Parameters:

  • msg

    (str) –

    Message to append to :attr:warnings.

Returns:

Source code in src/pipeline_watcher/core.py
def warn(self, msg: str) -> "ReportBase":
    """Append a non-fatal warning.

    Parameters
    ----------
    msg : str
        Message to append to :attr:`warnings`.

    Returns
    -------
    ReportBase
        Self.
    """
    self.warnings.append(msg)
    return self

FileReport

Bases: ReportBase

Per-file processing timeline composed of ordered steps.

Aggregates StepReport items and provides helpers for appending terminal steps (success/failed/skipped) and requesting human-in-the-loop (HITL) review.

Attributes:

  • # FileReport-specific
  • path (Path) –

    Source path or URI for display/debugging.

  • file_id (str or None) –

    Stable identifier for the file (preferably unique within a batch).

  • steps (list[StepReport]) –

    Ordered step sequence for this file.

  • n_steps (int) –

    Number of steps in the process (used to compute percent).

  • label (str) –

    Convenience label derived from :attr:path (basename).

  • name (str) –

    Convenience name derived from :attr:path (basename).

  • mime_type (str or None) –

    Guessed MIME type based on the path extension.

  • size_bytes (int or None) –

    Best-effort file size in bytes.

  • requires_human_review (bool) –

    Whether this file requires human review (computed).

  • human_review_reason (str or None) –

    Human-readable summary of why review is required (computed).

  • # Inherited lifecycle fields (see ReportBase)
  • status (Status) –

    Lifecycle status (see ReportBase).

  • percent (int) –

    Progress percentage (see ReportBase).

  • started_at (datetime | None) –

    When processing started (see ReportBase).

  • finished_at (datetime | None) –

    When processing finished (see ReportBase).

  • notes (list[str]) –

    Narrative messages (see ReportBase).

  • errors (list[str]) –

    Fatal issues (see ReportBase).

  • warnings (list[str]) –

    Non-fatal issues (see ReportBase).

  • metadata (dict[str, Any]) –

    Arbitrary structured context (see ReportBase).

  • review (ReviewFlag) –

    File-level HITL flag (see ReportBase).

  • report_version (str) –

    Schema version (see ReportBase).

Notes

See ReportBase for full semantics of lifecycle fields and methods (start, end, fail, etc.).

Methods:

  • _coerce_path
  • _flagged_steps

    Return steps that have requested human review.

  • _make_unique_step_id

    Generate a slugified, unique step id based on a label.

  • _recompute_percent

    Recompute :attr:percent as the fraction of completed steps.

  • add_completed_step

    Create a SUCCEEDED step and append it (chainable).

  • add_failed_step

    Create a FAILED step and append it (chainable).

  • add_review_step

    Create a step that requests HITL review, then append it.

  • add_skipped_step

    Create a SKIPPED step and append it (chainable).

  • append_step

    Finalize and append a step; recompute aggregate percent.

  • begin

    Construct and mark the file report as running.

  • clear_review

    Clear the HITL review flag.

  • end

    Finalize the unit if not already terminal.

  • error

    Append an error message (does not change status).

  • fail

    Finalize as failed (FAILED).

  • last_step

    Return the most recently appended step or None if empty.

  • model_post_init
  • note

    Append a user-facing note.

  • request_review

    Set the HITL review flag.

  • skip

    Finalize as skipped (SKIPPED).

  • start

    Mark the unit as running and stamp started_at if missing.

  • succeed

    Finalize successfully (SUCCEEDED) and set percent=100.

  • warn

    Append a non-fatal warning.

_pipeline class-attribute instance-attribute

_pipeline: Optional['PipelineReport'] = PrivateAttr(default=None)

defer_start class-attribute instance-attribute

defer_start: bool = Field(default=False, exclude=True, repr=False)

duration_ms property

duration_ms: float | None

Elapsed time in milliseconds.

Returns None if timing cannot be determined (e.g., no started_at). Uses finished_at when present; otherwise uses 'now' to reflect in-flight duration. Clamped at >= 0 and rounded to 3 decimals.

errors class-attribute instance-attribute

errors: List[str] = Field(default_factory=list)

failed property

failed: bool

Return True if the unit has failed.

file_id class-attribute instance-attribute

file_id: Optional[str] = None

finished_at class-attribute instance-attribute

finished_at: Optional[datetime] = None

human_review_reason property

human_review_reason: Optional[str]

Compact human-readable reason summarizing review needs (computed).

Combines file-level reason (if present) with a roll-up of flagged steps. Shows up to five step names and a “+N more” suffix if necessary.

Returns:

  • str or None

    Summary text or None if no review is requested.

label property

label: str

metadata class-attribute instance-attribute

metadata: Dict[str, Any] = Field(default_factory=dict)

mime_type property

mime_type: Optional[str]

n_steps class-attribute instance-attribute

n_steps: int = 1

name property

name: str

notes class-attribute instance-attribute

notes: List[str] = Field(default_factory=list)

ok property

ok: bool

Truthiness of success used by :meth:end.

Returns:

  • bool

    False if status is FAILED or any errors exist. True if status is SUCCEEDED. Otherwise, all(step.ok for step in steps) (or True if no steps).

path instance-attribute

path: Path

pending property

pending

Return True if the unit is pending.

percent class-attribute instance-attribute

percent: int = 0

report_version class-attribute instance-attribute

report_version: str = SCHEMA_VERSION

requires_human_review property

requires_human_review: bool

Whether this file needs human review (computed).

True if the file itself is flagged or any contained step is flagged.

Returns:

  • bool

    True if review is required; otherwise False.

review class-attribute instance-attribute

review: ReviewFlag = Field(default_factory=ReviewFlag)

running property

running

Return True if the unit is running.

size_bytes property

size_bytes: Optional[int]

Best-effort determination of file's size in bytes. Avoid raising on missing/inaccessible paths.

skipped property

skipped

Return True if the unit was skipped.

started_at class-attribute instance-attribute

started_at: Optional[datetime] = None

status class-attribute instance-attribute

status: Status = PENDING

steps class-attribute instance-attribute

steps: List[StepReport] = Field(default_factory=list)

succeeded property

succeeded

Return True if the unit has succeeded.

terminal property

terminal

Return True if the unit is terminal.

warnings class-attribute instance-attribute

warnings: List[str] = Field(default_factory=list)

_coerce_path classmethod

_coerce_path(v)
Source code in src/pipeline_watcher/core.py
@field_validator("path", mode="before")
@classmethod
def _coerce_path(cls, v):
    if isinstance(v, (str, Path)):
        p = Path(v)
        if str(p) == "":
            raise ValueError("path cannot be empty")
        return p
    raise TypeError("path must be str or Path")

_flagged_steps

_flagged_steps() -> List[StepReport]

Return steps that have requested human review.

Returns:

  • list[StepReport]

    Subset of :attr:steps whose review.flagged is True.

Source code in src/pipeline_watcher/core.py
def _flagged_steps(self) -> List[StepReport]:
    """Return steps that have requested human review.

    Returns
    -------
    list[StepReport]
        Subset of :attr:`steps` whose ``review.flagged`` is ``True``.
    """
    out = []
    for s in self.steps:
        rf = getattr(s, "review", None)
        if rf and getattr(rf, "flagged", False):
            out.append(s)
    return out

_make_unique_step_id

_make_unique_step_id(label: str) -> str

Generate a slugified, unique step id based on a label.

If the slug already exists among current steps, appends -2, -3, etc., until unique.

Parameters:

  • label

    (str) –

    Human-readable label to slugify.

Returns:

  • str

    Unique step identifier.

Source code in src/pipeline_watcher/core.py
def _make_unique_step_id(self, label: str) -> str:
    """Generate a slugified, unique step id based on a label.

    If the slug already exists among current steps, appends ``-2``, ``-3``,
    etc., until unique.

    Parameters
    ----------
    label : str
        Human-readable label to slugify.

    Returns
    -------
    str
        Unique step identifier.
    """
    base = _slugify(label) or "step"
    existing = {s.id for s in self.steps}
    if base not in existing:
        return base
    i = 2
    while f"{base}-{i}" in existing:
        i += 1
    return f"{base}-{i}"

_recompute_percent

_recompute_percent() -> None

Recompute :attr:percent as the fraction of completed steps.

Source code in src/pipeline_watcher/core.py
def _recompute_percent(self) -> None:
    """Recompute :attr:`percent` as the fraction of completed steps.
    """
    if self.n_steps and self.steps:
        self.percent = int(100 * sum(1 for s in self.steps if s.terminal) / self.n_steps + 0.5)
    else:
        self.percent = 0

add_completed_step

add_completed_step(label: str, *, id: str | None = None, note: str | None = None, metadata: dict | None = None) -> 'FileReport'

Create a SUCCEEDED step and append it (chainable).

Parameters:

  • label

    (str) –

    Step label for UI.

  • id

    (str, default: None ) –

    Explicit step id; if omitted, a unique id is derived from label.

  • note

    (str | None, default: None ) –

    Optional note appended to the step.

  • metadata

    (dict, default: None ) –

    Metadata merged into the step.

Returns:

Source code in src/pipeline_watcher/core.py
def add_completed_step(
    self,
    label: str,
    *,
    id: str | None = None,
    note: str | None = None,
    metadata: dict | None = None,
) -> "FileReport":
    """Create a ``SUCCEEDED`` step and append it (chainable).

    Parameters
    ----------
    label : str
        Step label for UI.
    id : str, optional
        Explicit step id; if omitted, a unique id is derived from ``label``.
    note (optional): str
        Optional note appended to the step.
    metadata : dict, optional
        Metadata merged into the step.

    Returns
    -------
    FileReport
        Self.
    """
    sid = id or self._make_unique_step_id(label)
    step = StepReport.begin(label, id=sid)
    if metadata:
        step.metadata.update(metadata)
    if note:
        step.notes.append(note)
    step.succeed()  # terminal; append_step will end() again idempotently
    return self.append_step(step)

add_failed_step

add_failed_step(label: str, *, id: str | None = None, reason: str | None = None, metadata: dict | None = None) -> 'FileReport'

Create a FAILED step and append it (chainable).

Parameters:

  • label

    (str) –

    Step label.

  • id

    (str, default: None ) –

    Explicit id; if omitted, derived from label.

  • reason

    (str, default: None ) –

    Failure reason recorded on the step.

  • metadata

    (dict, default: None ) –

    Metadata merged into the step.

Returns:

Source code in src/pipeline_watcher/core.py
def add_failed_step(
    self,
    label: str,
    *,
    id: str | None = None,
    reason: str | None = None,
    metadata: dict | None = None,
) -> "FileReport":
    """Create a ``FAILED`` step and append it (chainable).

    Parameters
    ----------
    label : str
        Step label.
    id : str, optional
        Explicit id; if omitted, derived from ``label``.
    reason : str, optional
        Failure reason recorded on the step.
    metadata : dict, optional
        Metadata merged into the step.

    Returns
    -------
    FileReport
        Self.
    """
    sid = id or self._make_unique_step_id(label)
    step = StepReport.begin(sid, label=label)
    if metadata:
        step.metadata.update(metadata)
    step.fail(reason)
    return self.append_step(step)

add_review_step

add_review_step(label: str, *, id: str | None = None, reason: str | None = None, metadata: dict | None = None, mark_success: bool = True) -> 'FileReport'

Create a step that requests HITL review, then append it.

By default the step is marked SUCCESS (common pattern: “passed but needs review”). The file-level review flag will be set when appended if the file isn’t already flagged.

Parameters:

  • label

    (str) –

    Step label.

  • id

    (str, default: None ) –

    Explicit id; if omitted, derived from label.

  • reason

    (str, default: None ) –

    UI-visible reason for review.

  • metadata

    (dict, default: None ) –

    Extra context for reviewers.

  • mark_success

    (bool, default: True ) –

    If True, mark the step SUCCESS; otherwise leave status as-is.

Returns:

Source code in src/pipeline_watcher/core.py
def add_review_step(
    self,
    label: str,
    *,
    id: str | None = None,
    reason: str | None = None,
    metadata: dict | None = None,
    mark_success: bool = True,
) -> "FileReport":
    """Create a step that requests HITL review, then append it.

    By default the step is marked ``SUCCESS`` (common pattern: “passed
    but needs review”). The file-level review flag will be set when
    appended if the file isn’t already flagged.

    Parameters
    ----------
    label : str
        Step label.
    id : str, optional
        Explicit id; if omitted, derived from ``label``.
    reason : str, optional
        UI-visible reason for review.
    metadata : dict, optional
        Extra context for reviewers.
    mark_success : bool, default True
        If ``True``, mark the step ``SUCCESS``; otherwise leave status as-is.

    Returns
    -------
    FileReport
        Self.
    """
    sid = id or self._make_unique_step_id(label)
    step = StepReport.begin(sid, label=label).request_review(reason)
    if metadata:
        step.metadata.update(metadata)
    if mark_success:
        step.succeed()
    # If your append_step rolls review up to the FileReport, that'll happen there.
    return self.append_step(step)

add_skipped_step

add_skipped_step(label: str, *, id: str | None = None, reason: str | None = None, metadata: dict | None = None) -> 'FileReport'

Create a SKIPPED step and append it (chainable).

Parameters:

  • label

    (str) –

    Step label.

  • id

    (str, default: None ) –

    Explicit id; if omitted, derived from label.

  • reason

    (str, default: None ) –

    Skip rationale (added to notes).

  • metadata

    (dict, default: None ) –

    Metadata merged into the step.

Returns:

Source code in src/pipeline_watcher/core.py
def add_skipped_step(
    self,
    label: str,
    *,
    id: str | None = None,
    reason: str | None = None,
    metadata: dict | None = None,
) -> "FileReport":
    """Create a ``SKIPPED`` step and append it (chainable).

    Parameters
    ----------
    label : str
        Step label.
    id : str, optional
        Explicit id; if omitted, derived from ``label``.
    reason : str, optional
        Skip rationale (added to notes).
    metadata : dict, optional
        Metadata merged into the step.

    Returns
    -------
    FileReport
        Self.
    """
    sid = id or self._make_unique_step_id(label)
    step = StepReport.begin(sid, label=label)
    if metadata:
        step.metadata.update(metadata)
    step.skip(reason)
    return self.append_step(step)

append_step

append_step(step: StepReport, max_steps: int = 10000) -> 'FileReport'

Finalize and append a step; recompute aggregate percent.

The step is finalized via StepReport.end, appended to steps, and the file percent is updated as the arithmetic mean of child step percents. If the step requests HITL review and the file is not already flagged, the file's review is set.

Parameters:

  • step

    (StepReport) –

    Step to finalize and append.

  • max_steps

    (int, default: 10000 ) –

    The maximum number of steps allowed.

Returns:

Source code in src/pipeline_watcher/core.py
def append_step(self,
                step: StepReport,
                max_steps: int = 10_000) -> "FileReport":
    """Finalize and append a step; recompute aggregate percent.

    The step is finalized via [StepReport.end](.#pipeline_watcher.StepReport.end), appended to
    [steps](.#pipeline_watcher.FileReport.steps), and the file percent is updated as the arithmetic
    mean of child step percents. If the step requests HITL review and
    the file is not already flagged, the file's [review](.#pipeline_watcher.FileReport.review) is set.

    Parameters
    ----------
    step : StepReport
        Step to finalize and append.
    max_steps : int, optional
        The maximum number of steps allowed.

    Returns
    -------
    FileReport
        Self (chainable).
    """
    step.end()
    if not step.id:
        step.id = construct_unique_step_id_from_label(step.label, self.steps)
    step.id = make_step_id_unique(step.id, self.steps)
    self.steps.append(step)
    self._recompute_percent()
    # roll-up HITL review if you added ReviewFlag earlier
    if step.review.flagged and not self.review.flagged:
        self.review = ReviewFlag(flagged=True,
                                 reason=step.review.reason or f"Step '{step.id}' requested review")
    return self

begin classmethod

begin(path: Path | str, file_id: str | None = None, n_steps: int = 1, metadata: dict | None = None) -> 'FileReport'

Construct and mark the file report as running.

Parameters:

  • path

    (Path) –

    The path to file.

  • file_id

    (str | None, default: None ) –

    Stable file identifier.

  • n_steps

    (int, default: 1 ) –

    Number of steps in process.

  • metadata

    (dict | None, default: None ) –

    Dictionary of metadata about the file.

Returns:

  • FileReport

    Started file report (status=RUNNING).

Source code in src/pipeline_watcher/core.py
@classmethod
def begin(cls,
          path: Path | str,
          file_id: str | None = None,
          n_steps: int = 1,
          metadata: dict | None = None
) -> "FileReport":
    """Construct and mark the file report as running.

    Parameters
    ----------
    path : Path
        The path to file.
    file_id (optional) : str
        Stable file identifier.
    n_steps : int, optional
        Number of steps in process.
    metadata (optional): dict
        Dictionary of metadata about the file.

    Returns
    -------
    FileReport
        Started file report (``status=RUNNING``).
    """
    return cls(path=Path(path),
               file_id=file_id,
               n_steps=n_steps,
               metadata=dict(metadata) if metadata else {}).start()

clear_review

clear_review() -> HasReviewTV

Clear the HITL review flag.

Returns:

Source code in src/pipeline_watcher/core.py
def clear_review(self: HasReviewTV) -> HasReviewTV:
    """Clear the HITL review flag.

    Returns
    -------
    ReportBase
        Self.
    """
    self.review = ReviewFlag()
    return self

end

end() -> 'ReportBase'

Finalize the unit if not already terminal.

If already terminal (SUCCESS, FAILED, SKIPPED), this stamps :attr:finished_at if missing and returns. Otherwise, infers success from :attr:ok (roll-up of step outcomes) and calls :meth:succeed or :meth:fail accordingly.

Returns:

Source code in src/pipeline_watcher/core.py
def end(self) -> "ReportBase":
    """Finalize the unit if not already terminal.

    If already terminal (``SUCCESS``, ``FAILED``, ``SKIPPED``), this stamps
    :attr:`finished_at` if missing and returns. Otherwise, infers success
    from :attr:`ok` (roll-up of step outcomes) and calls :meth:`succeed`
    or :meth:`fail` accordingly.

    Returns
    -------
    ReportBase
        Self.
    """
    if self.status.terminal:
        if not self.finished_at:
            self.finished_at = _now()
        return self
    return self.succeed() if self.ok else self.fail("One or more file steps failed")

error

error(msg: str) -> 'ReportBase'

Append an error message (does not change status).

Note
Fail is the preferred method. error does not change status for design consistency.

Parameters:

  • msg

    (str) –

    Message to append to :attr:errors.

Returns:

Notes

Use :meth:fail to change terminal status to FAILED. This method only records text.

Source code in src/pipeline_watcher/core.py
def error(self, msg: str) -> "ReportBase":
    """Append an error message (does not change status).

    Note
    ----
        Fail is the preferred method. error does not change status for design consistency.

    Parameters
    ----------
    msg : str
        Message to append to :attr:`errors`.

    Returns
    -------
    ReportBase
        Self.

    Notes
    -----
    Use :meth:`fail` to change terminal status to ``FAILED``. This method
    only records text.
    """
    self.errors.append(msg)
    return self

fail

fail(message: Optional[str] = None) -> 'ReportBase'

Finalize as failed (FAILED).

Parameters:

  • message

    (str, default: None ) –

    Error text to append to :attr:errors.

Returns:

Source code in src/pipeline_watcher/core.py
def fail(self, message: Optional[str] = None) -> "ReportBase":
    """Finalize as failed (``FAILED``).

    Parameters
    ----------
    message : str, optional
        Error text to append to :attr:`errors`.

    Returns
    -------
    ReportBase
        Self.
    """
    self.status = Status.FAILED
    if message:
        self.error(message)
    self.finished_at = _now()
    return self

last_step

last_step() -> StepReport | None

Return the most recently appended step or None if empty.

Returns:

  • StepReport or None

    Last step in :attr:steps, if any.

Source code in src/pipeline_watcher/core.py
def last_step(self) -> StepReport | None:
    """Return the most recently appended step or ``None`` if empty.

    Returns
    -------
    StepReport or None
        Last step in :attr:`steps`, if any.
    """
    return self.steps[-1] if self.steps else None

model_post_init

model_post_init(__context) -> None
Source code in src/pipeline_watcher/core.py
def model_post_init(self, __context) -> None:
    # auto-start unless caller explicitly defers
    if not self.defer_start and self.pending:
        self.start()

note

note(msg: str) -> 'ReportBase'

Append a user-facing note.

Parameters:

  • msg

    (str) –

    Message to append to :attr:notes.

Returns:

Source code in src/pipeline_watcher/core.py
def note(self, msg: str) -> "ReportBase":
    """Append a user-facing note.

    Parameters
    ----------
    msg : str
        Message to append to :attr:`notes`.

    Returns
    -------
    ReportBase
        Self.
    """
    self.notes.append(msg)
    return self

request_review

request_review(reason: str | None = None) -> HasReviewTV

Set the HITL review flag.

Parameters:

  • reason

    (str, default: None ) –

    Short UI-visible reason for requesting review.

Returns:

Source code in src/pipeline_watcher/core.py
def request_review(self: HasReviewTV, reason: str | None = None) -> HasReviewTV:
    """Set the HITL review flag.

    Parameters
    ----------
    reason : str, optional
        Short UI-visible reason for requesting review.

    Returns
    -------
    ReportBase
        Self.
    """
    self.review = ReviewFlag(flagged=True, reason=reason)
    return self

skip

skip(reason: Optional[str] = None) -> 'ReportBase'

Finalize as skipped (SKIPPED).

Parameters:

  • reason

    (str, default: None ) –

    Rationale appended to :attr:notes as "Skipped: {reason}".

Returns:

Source code in src/pipeline_watcher/core.py
def skip(self, reason: Optional[str] = None) -> "ReportBase":
    """Finalize as skipped (``SKIPPED``).

    Parameters
    ----------
    reason : str, optional
        Rationale appended to :attr:`notes` as ``"Skipped: {reason}"``.

    Returns
    -------
    ReportBase
        Self.
    """
    self.status = Status.SKIPPED
    if reason:
        self.notes.append(f"Skipped: {reason}")
    self.finished_at = _now()
    return self

start

start() -> 'ReportBase'

Mark the unit as running and stamp started_at if missing.

Returns:

Examples:

>>> step = StepReport(...)
>>> step.start().note("begin").percent == 0
True
Source code in src/pipeline_watcher/core.py
def start(self) -> "ReportBase":
    """Mark the unit as running and stamp ``started_at`` if missing.

    Returns
    -------
    ReportBase
        Self (for fluent chaining).

    Examples
    --------
    >>> step = StepReport(...)
    >>> step.start().note("begin").percent == 0
    True
    """
    self.status = Status.RUNNING
    if not self.started_at:
        self.started_at = _now()
    return self

succeed

succeed() -> 'ReportBase'

Finalize successfully (SUCCEEDED) and set percent=100.

Also stamps finished_at to the current time.

Returns:

Source code in src/pipeline_watcher/core.py
def succeed(self) -> "ReportBase":
    """Finalize successfully (``SUCCEEDED``) and set ``percent=100``.

    Also stamps ``finished_at`` to the current time.

    Returns
    -------
    ReportBase
        Self.
    """
    self.status = Status.SUCCEEDED
    self.percent = 100
    self.finished_at = _now()
    return self

warn

warn(msg: str) -> 'ReportBase'

Append a non-fatal warning.

Parameters:

  • msg

    (str) –

    Message to append to :attr:warnings.

Returns:

Source code in src/pipeline_watcher/core.py
def warn(self, msg: str) -> "ReportBase":
    """Append a non-fatal warning.

    Parameters
    ----------
    msg : str
        Message to append to :attr:`warnings`.

    Returns
    -------
    ReportBase
        Self.
    """
    self.warnings.append(msg)
    return self

StepReport

Bases: ReportBase

Single unit of work within a file or batch.

A step succeeds if it is explicitly marked SUCCESS or, when not terminal, if all recorded checks pass and no errors are present.

Aggregates validation checks and lifecycle metadata inherited from ReportBase.

Attributes:

Notes
  • id may be omitted; containers such as FileReport often assign a unique ID.
  • The auto-generated initializer accepts the same fields as attributes.
  • Lifecycle semantics (start, end, succeed, fail, skip) are defined in ReportBase.

Examples:

>>> st = StepReport.begin("Extract text (OCR)")
>>> st.add_check("ocr_quality>=0.9", ok=True)
>>> st.end()  # end is typically called by FileReport.append_step
>>> st.succeeded
True
>>> st.terminal
True
>>> st.duration_ms
314

Methods:

  • _default_id
  • add_check

    Record a boolean validation result.

  • begin

    Construct and mark the step as started.

  • clear_review

    Clear the HITL review flag.

  • end

    Finalize the unit if not already terminal.

  • error

    Append an error message (does not change status).

  • fail

    Finalize as failed (FAILED).

  • model_post_init
  • note

    Append a user-facing note.

  • request_review

    Set the HITL review flag.

  • skip

    Finalize as skipped (SKIPPED).

  • start

    Mark the unit as running and stamp started_at if missing.

  • succeed

    Finalize successfully (SUCCEEDED) and set percent=100.

  • warn

    Append a non-fatal warning.

checks class-attribute instance-attribute

checks: List[Check] = Field(default_factory=list)

defer_start class-attribute instance-attribute

defer_start: bool = Field(default=False, exclude=True, repr=False)

duration_ms property

duration_ms: float | None

Elapsed time in milliseconds.

Returns None if timing cannot be determined (e.g., no started_at). Uses finished_at when present; otherwise uses 'now' to reflect in-flight duration. Clamped at >= 0 and rounded to 3 decimals.

errors class-attribute instance-attribute

errors: List[str] = Field(default_factory=list)

failed property

failed: bool

Return True if the unit has failed.

finished_at class-attribute instance-attribute

finished_at: Optional[datetime] = None

id class-attribute instance-attribute

id: Optional[str] = None

label instance-attribute

label: str

metadata class-attribute instance-attribute

metadata: Dict[str, Any] = Field(default_factory=dict)

notes class-attribute instance-attribute

notes: List[str] = Field(default_factory=list)

ok property

ok: bool

Reviews checks and surfaces errors.

Returns:

  • bool

    False if status is FAILED or any errors exist. True if status is SUCCESS. Otherwise, all(check.ok for check in checks) (or True if no checks).

pending property

pending

Return True if the unit is pending.

percent class-attribute instance-attribute

percent: int = 0

report_version class-attribute instance-attribute

report_version: str = SCHEMA_VERSION

requires_human_review property

requires_human_review: bool

Whether the unit is flagged for human review.

Returns:

  • bool

    True if :attr:review.flagged is set, else False.

review class-attribute instance-attribute

review: ReviewFlag = Field(default_factory=ReviewFlag)

running property

running

Return True if the unit is running.

skipped property

skipped

Return True if the unit was skipped.

started_at class-attribute instance-attribute

started_at: Optional[datetime] = None

status class-attribute instance-attribute

status: Status = PENDING

succeeded property

succeeded

Return True if the unit has succeeded.

terminal property

terminal

Return True if the unit is terminal.

warnings class-attribute instance-attribute

warnings: List[str] = Field(default_factory=list)

_default_id

_default_id()
Source code in src/pipeline_watcher/core.py
@model_validator(mode="after")
def _default_id(self):
    # Treat None/"" as unset; drop the `or self.label == ""` part if "" is meaningful
    if self.id is None or self.id == "":
        # When frozen, use object.__setattr__ during construction-time adjustments
        object.__setattr__(self, "id", _slugify(self.label))
    return self

add_check

add_check(name: str, ok: bool, detail: Optional[str] = None) -> None

Record a boolean validation result.

Parameters:

  • name

    (str) –

    Check identifier (e.g., "manifest_present").

  • ok

    (bool) –

    Outcome of the check.

  • detail

    (str, default: None ) –

    Additional context for UI/debugging.

Examples:

>>> st = StepReport.begin("validate")
>>> st.add_check("ids_unique", ok=False, detail="3 duplicates")
>>> st.ok
False
Source code in src/pipeline_watcher/core.py
def add_check(self, name: str, ok: bool, detail: Optional[str] = None) -> None:
    """Record a boolean validation result.

    Parameters
    ----------
    name : str
        Check identifier (e.g., ``"manifest_present"``).
    ok : bool
        Outcome of the check.
    detail : str, optional
        Additional context for UI/debugging.

    Examples
    --------
    >>> st = StepReport.begin("validate")
    >>> st.add_check("ids_unique", ok=False, detail="3 duplicates")
    >>> st.ok
    False
    """
    self.checks.append(Check(name=name, ok=ok, detail=detail))

begin classmethod

begin(label: str, *, id: str | None = None) -> 'StepReport'

Construct and mark the step as started.

Parameters:

  • label

    (str) –

    Human-friendly label.

  • id

    (str | None, default: None ) –

    Step identifier.

Returns:

  • StepReport

    Started step report (status=RUNNING).

Source code in src/pipeline_watcher/core.py
@classmethod
def begin(cls, label: str, *, id: str | None = None) -> "StepReport":
    """Construct and mark the step as started.

    Parameters
    ----------
    label : str
        Human-friendly label.
    id : str | None, optional
        Step identifier.

    Returns
    -------
    StepReport
        Started step report (``status=RUNNING``).
    """
    if id is None or id == "":
        id = _slugify(label)
    return cls(label=label, id=id).start()

clear_review

clear_review() -> HasReviewTV

Clear the HITL review flag.

Returns:

Source code in src/pipeline_watcher/core.py
def clear_review(self: HasReviewTV) -> HasReviewTV:
    """Clear the HITL review flag.

    Returns
    -------
    ReportBase
        Self.
    """
    self.review = ReviewFlag()
    return self

end

end() -> 'ReportBase'

Finalize the unit if not already terminal.

If already terminal (SUCCESS, FAILED, SKIPPED), this stamps :attr:finished_at if missing and returns. Otherwise, infers success from :attr:ok (roll-up of step outcomes) and calls :meth:succeed or :meth:fail accordingly.

Returns:

Source code in src/pipeline_watcher/core.py
def end(self) -> "ReportBase":
    """Finalize the unit if not already terminal.

    If already terminal (``SUCCESS``, ``FAILED``, ``SKIPPED``), this stamps
    :attr:`finished_at` if missing and returns. Otherwise, infers success
    from :attr:`ok` (roll-up of step outcomes) and calls :meth:`succeed`
    or :meth:`fail` accordingly.

    Returns
    -------
    ReportBase
        Self.
    """
    if self.status.terminal:
        if not self.finished_at:
            self.finished_at = _now()
        return self
    return self.succeed() if self.ok else self.fail("One or more file steps failed")

error

error(msg: str) -> 'ReportBase'

Append an error message (does not change status).

Note
Fail is the preferred method. error does not change status for design consistency.

Parameters:

  • msg

    (str) –

    Message to append to :attr:errors.

Returns:

Notes

Use :meth:fail to change terminal status to FAILED. This method only records text.

Source code in src/pipeline_watcher/core.py
def error(self, msg: str) -> "ReportBase":
    """Append an error message (does not change status).

    Note
    ----
        Fail is the preferred method. error does not change status for design consistency.

    Parameters
    ----------
    msg : str
        Message to append to :attr:`errors`.

    Returns
    -------
    ReportBase
        Self.

    Notes
    -----
    Use :meth:`fail` to change terminal status to ``FAILED``. This method
    only records text.
    """
    self.errors.append(msg)
    return self

fail

fail(message: Optional[str] = None) -> 'ReportBase'

Finalize as failed (FAILED).

Parameters:

  • message

    (str, default: None ) –

    Error text to append to :attr:errors.

Returns:

Source code in src/pipeline_watcher/core.py
def fail(self, message: Optional[str] = None) -> "ReportBase":
    """Finalize as failed (``FAILED``).

    Parameters
    ----------
    message : str, optional
        Error text to append to :attr:`errors`.

    Returns
    -------
    ReportBase
        Self.
    """
    self.status = Status.FAILED
    if message:
        self.error(message)
    self.finished_at = _now()
    return self

model_post_init

model_post_init(__context) -> None
Source code in src/pipeline_watcher/core.py
def model_post_init(self, __context) -> None:
    # auto-start unless caller explicitly defers
    if not self.defer_start and self.pending:
        self.start()

note

note(msg: str) -> 'ReportBase'

Append a user-facing note.

Parameters:

  • msg

    (str) –

    Message to append to :attr:notes.

Returns:

Source code in src/pipeline_watcher/core.py
def note(self, msg: str) -> "ReportBase":
    """Append a user-facing note.

    Parameters
    ----------
    msg : str
        Message to append to :attr:`notes`.

    Returns
    -------
    ReportBase
        Self.
    """
    self.notes.append(msg)
    return self

request_review

request_review(reason: str | None = None) -> HasReviewTV

Set the HITL review flag.

Parameters:

  • reason

    (str, default: None ) –

    Short UI-visible reason for requesting review.

Returns:

Source code in src/pipeline_watcher/core.py
def request_review(self: HasReviewTV, reason: str | None = None) -> HasReviewTV:
    """Set the HITL review flag.

    Parameters
    ----------
    reason : str, optional
        Short UI-visible reason for requesting review.

    Returns
    -------
    ReportBase
        Self.
    """
    self.review = ReviewFlag(flagged=True, reason=reason)
    return self

skip

skip(reason: Optional[str] = None) -> 'ReportBase'

Finalize as skipped (SKIPPED).

Parameters:

  • reason

    (str, default: None ) –

    Rationale appended to :attr:notes as "Skipped: {reason}".

Returns:

Source code in src/pipeline_watcher/core.py
def skip(self, reason: Optional[str] = None) -> "ReportBase":
    """Finalize as skipped (``SKIPPED``).

    Parameters
    ----------
    reason : str, optional
        Rationale appended to :attr:`notes` as ``"Skipped: {reason}"``.

    Returns
    -------
    ReportBase
        Self.
    """
    self.status = Status.SKIPPED
    if reason:
        self.notes.append(f"Skipped: {reason}")
    self.finished_at = _now()
    return self

start

start() -> 'ReportBase'

Mark the unit as running and stamp started_at if missing.

Returns:

Examples:

>>> step = StepReport(...)
>>> step.start().note("begin").percent == 0
True
Source code in src/pipeline_watcher/core.py
def start(self) -> "ReportBase":
    """Mark the unit as running and stamp ``started_at`` if missing.

    Returns
    -------
    ReportBase
        Self (for fluent chaining).

    Examples
    --------
    >>> step = StepReport(...)
    >>> step.start().note("begin").percent == 0
    True
    """
    self.status = Status.RUNNING
    if not self.started_at:
        self.started_at = _now()
    return self

succeed

succeed() -> 'ReportBase'

Finalize successfully (SUCCEEDED) and set percent=100.

Also stamps finished_at to the current time.

Returns:

Source code in src/pipeline_watcher/core.py
def succeed(self) -> "ReportBase":
    """Finalize successfully (``SUCCEEDED``) and set ``percent=100``.

    Also stamps ``finished_at`` to the current time.

    Returns
    -------
    ReportBase
        Self.
    """
    self.status = Status.SUCCEEDED
    self.percent = 100
    self.finished_at = _now()
    return self

warn

warn(msg: str) -> 'ReportBase'

Append a non-fatal warning.

Parameters:

  • msg

    (str) –

    Message to append to :attr:warnings.

Returns:

Source code in src/pipeline_watcher/core.py
def warn(self, msg: str) -> "ReportBase":
    """Append a non-fatal warning.

    Parameters
    ----------
    msg : str
        Message to append to :attr:`warnings`.

    Returns
    -------
    ReportBase
        Self.
    """
    self.warnings.append(msg)
    return self