Skip to content

PipelineReport

PipelineReport

Bases: BaseModel

Batch-level container with ordered batch steps and per-file reports.

Designed to be JSON-serializable (Pydantic v2) and UI-friendly. Maintains a progress banner (stage, percent, message, updated_at), an ordered list of batch-level StepReport items, and a list of FileReport records.

Attributes:

  • label (str) –

    Human-friendly batch label (e.g., project/run name).

  • kind ({'validation', 'process', 'test'}) –

    Category of run for UI filtering and routing.

  • stage (str) –

    Short machine-friendly stage label for the progress banner.

  • percent (int) –

    Overall progress 0..100 (informational; not enforced).

  • message (str) –

    Human-readable progress note for dashboards.

  • updated_at (datetime) –

    Last update timestamp (UTC). Defaults to now on construction.

  • report_version (str) –

    Schema version emitted to artifacts.

  • steps (list[StepReport]) –

    Ordered sequence of batch-level steps (rare compared to file steps, but useful for pre/post stages like discovery/validation).

  • files (list[FileReport]) –

    Per-file timelines collected in this batch.

  • output_path (Path or None) –

    Default output path for save.

  • status (Status) –

    Aggregated lifecycle status over all batch steps and file reports (computed). See [Status][pipeline_watcher.core.Status] and PipelineReport.status for roll-up rules.

Notes
  • The model is append-only for auditability.
  • Use set_progress to update the banner; it stamps updated_at.

Methods:

  • add_completed_step

    Construct, finalize, append, and return a batch-level step. Ensures id uniqueness.

  • append_file

    Finalize and append a :class:FileReport; update updated_at.

  • append_step

    Finalize and append a batch-level step; update updated_at.

  • file_processed

    Whether a file has reached a terminal/finished state.

  • file_seen

    Whether a file with the given key appears in the report at all.

  • files_for

    Return all file reports matching an id/name/path/basename.

  • get_file

    Return the first matching file report or None if absent.

  • iter_steps

    Iterate over batch-level steps, optionally filtering by status.

  • last_step

    Return the last batch-level step or None if empty.

  • recompute_overall_from_steps

    Recalculate overall percent from batch-level steps.

  • save
  • set_progress

    Update the progress banner and timestamp.

  • table_rows_for_files_map

    Produce Django/Jinja-friendly summary rows for a files mapping.

  • unseen_expected

    Return expected filenames/ids/paths that are not present.

files class-attribute instance-attribute

files: List[FileReport] = Field(default_factory=list)

kind class-attribute instance-attribute

kind: Literal['validation', 'process', 'test'] = 'process'

label instance-attribute

label: str

message class-attribute instance-attribute

message: str = ''

output_path class-attribute instance-attribute

output_path: Optional[Path] = None

percent class-attribute instance-attribute

percent: int = 0

report_version class-attribute instance-attribute

report_version: str = SCHEMA_VERSION

stage class-attribute instance-attribute

stage: str = ''

status property

status: Status

Aggregated lifecycle status over all batch steps and file reports.

Rules (in priority order)
  • FAILED → if any step or file has failed.
  • RUNNING → if none failed but at least one is running.
  • SKIPPED → if there is at least one unit and all are skipped.
  • SUCCEEDED→ if all units are terminal, none failed, and at least one succeeded.
  • PENDING → otherwise (e.g., no units, or only pending units).

steps class-attribute instance-attribute

steps: List[StepReport] = Field(default_factory=list)

updated_at class-attribute instance-attribute

updated_at: datetime = Field(default_factory=now_utc)

add_completed_step

add_completed_step(label: str, *, id: str | None = None) -> StepReport

Construct, finalize, append, and return a batch-level step. Ensures id uniqueness.

Parameters:

  • label

    (str) –

    Human-friendly label.

  • id

    (str, default: None ) –

    Step identifier.

Returns:

Source code in src/pipeline_watcher/core.py
def add_completed_step(self, label: str, *, id: str | None = None) -> StepReport:
    """Construct, finalize, append, and return a batch-level step. Ensures id uniqueness.

    Parameters
    ----------
    label : str
        Human-friendly label.
    id : str, optional
        Step identifier.
    Returns
    -------
    StepReport
        The created (and ended) step.
    """
    if not id:
        id = construct_unique_step_id_from_label(label, self.steps)
    step = StepReport.begin(label, id=id)
    self.append_step(step)
    return step

append_file

append_file(fr: FileReport) -> 'PipelineReport'

Finalize and append a :class:FileReport; update updated_at.

Parameters:

  • fr

    (FileReport) –

    File report to finalize via :meth:FileReport.end and append.

Returns:

Source code in src/pipeline_watcher/core.py
def append_file(self, fr: FileReport) -> "PipelineReport":
    """Finalize and append a :class:`FileReport`; update ``updated_at``.

    Parameters
    ----------
    fr : FileReport
        File report to finalize via :meth:`FileReport.end` and append.

    Returns
    -------
    PipelineReport
        Self (chainable).
    """
    fr.end()
    self.files.append(fr)
    self.updated_at = _now()
    return self

append_step

append_step(step: StepReport) -> 'PipelineReport'

Finalize and append a batch-level step; update updated_at.

Parameters:

  • step

    (StepReport) –

    Step to finalize via :meth:StepReport.end and append.

Returns:

Source code in src/pipeline_watcher/core.py
def append_step(self, step: StepReport) -> "PipelineReport":
    """Finalize and append a batch-level step; update ``updated_at``.

    Parameters
    ----------
    step : StepReport
        Step to finalize via :meth:`StepReport.end` and append.

    Returns
    -------
    PipelineReport
        Self (chainable).
    """
    step.end()
    if not step.id:
        step.id = construct_unique_step_id_from_label(step.label, self.steps)
    step.id = make_step_id_unique(step.id, self.steps)
    self.steps.append(step)
    self.updated_at = _now()
    return self

file_processed

file_processed(key, *, require_success: bool = False) -> bool

Whether a file has reached a terminal/finished state.

By default, any terminal status counts (SUCCESS, FAILED, SKIPPED). When require_success=True, only SUCCESS counts.

Parameters:

  • key

    (Any) –

    File identifier/name/path/basename.

  • require_success

    (bool, default: False ) –

    If True, require SUCCESS only.

Returns:

  • bool

    True if processed under the selected rule.

Notes

As a additional heuristic, a file is considered processed if it has finished_at set or percent == 100.

Source code in src/pipeline_watcher/core.py
def file_processed(self, key, *, require_success: bool = False) -> bool:
    """Whether a file has reached a terminal/finished state.

    By default, any terminal status counts (``SUCCESS``, ``FAILED``,
    ``SKIPPED``). When ``require_success=True``, only ``SUCCESS`` counts.

    Parameters
    ----------
    key : Any
        File identifier/name/path/basename.
    require_success : bool, default False
        If ``True``, require ``SUCCESS`` only.

    Returns
    -------
    bool
        ``True`` if processed under the selected rule.

    Notes
    -----
    As a additional heuristic, a file is considered processed if it
    has ``finished_at`` set or ``percent == 100``.
    """
    fr = self.get_file(key)
    if fr is None:
        return False
    if require_success:
        return fr.status.succeeded
    # terminal = finished state OR explicit 100% OR finished_at set
    return (
            fr.status.terminal
            or getattr(fr, "finished_at", None) is not None
            or (getattr(fr, "percent", None) == 100)
    )

file_seen

file_seen(key) -> bool

Whether a file with the given key appears in the report at all.

Parameters:

  • key

    (Any) –

Returns:

  • bool
Source code in src/pipeline_watcher/core.py
def file_seen(self, key) -> bool:
    """Whether a file with the given key appears in the report at all.

    Parameters
    ----------
    key : Any

    Returns
    -------
    bool
    """
    return self.get_file(key) is not None

files_for

files_for(key) -> list['FileReport']

Return all file reports matching an id/name/path/basename.

Matching is case- and whitespace-insensitive and accepts identifiers, filenames, full paths, and basenames.

Parameters:

  • key

    (Any) –

    A value convertible to a normalized comparison key.

Returns:

  • list[FileReport]

    All matching file reports (may be empty).

Source code in src/pipeline_watcher/core.py
def files_for(self, key) -> list["FileReport"]:
    """Return all file reports matching an id/name/path/basename.

    Matching is case- and whitespace-insensitive and accepts
    identifiers, filenames, full paths, and basenames.

    Parameters
    ----------
    key : Any
        A value convertible to a normalized comparison key.

    Returns
    -------
    list[FileReport]
        All matching file reports (may be empty).
    """
    k = _norm_key(key)
    if not k:
        return []
    out = []
    for fr in self.files:
        if k in _file_keys(fr):
            out.append(fr)
    return out

get_file

get_file(key) -> 'FileReport | None'

Return the first matching file report or None if absent.

Parameters:

  • key

    (Any) –

    See :meth:files_for.

Returns:

Source code in src/pipeline_watcher/core.py
def get_file(self, key) -> "FileReport | None":
    """Return the first matching file report or ``None`` if absent.

    Parameters
    ----------
    key : Any
        See :meth:`files_for`.

    Returns
    -------
    FileReport or None
    """
    matches = self.files_for(key)
    return matches[0] if matches else None

iter_steps

iter_steps(*, status: Status | None = None) -> Iterable[StepReport]

Iterate over batch-level steps, optionally filtering by status.

Parameters:

  • status

    (StepStatus or None, default: None ) –

    If provided, only yield steps whose status equals status.

Yields:

Source code in src/pipeline_watcher/core.py
def iter_steps(self, *, status: Status | None = None) -> Iterable[StepReport]:
    """Iterate over batch-level steps, optionally filtering by status.

    Parameters
    ----------
    status : StepStatus or None, default None
        If provided, only yield steps whose status equals ``status``.

    Yields
    ------
    StepReport
        Matching steps in append order.
    """
    for s in self.steps:
        if status is None or s.status == status:
            yield s

last_step

last_step() -> StepReport | None

Return the last batch-level step or None if empty.

Returns:

Source code in src/pipeline_watcher/core.py
def last_step(self) -> StepReport | None:
    """Return the last batch-level step or ``None`` if empty.

    Returns
    -------
    StepReport or None
    """
    return self.steps[-1] if self.steps else None

recompute_overall_from_steps

recompute_overall_from_steps() -> None

Recalculate overall percent from batch-level steps.

Sets the banner percent to the arithmetic mean of child step percents and preserves the current stage/message (or uses defaults if unset). Does nothing if there are no steps.

Source code in src/pipeline_watcher/core.py
def recompute_overall_from_steps(self) -> None:
    """Recalculate overall ``percent`` from batch-level steps.

    Sets the banner percent to the arithmetic mean of child step
    percents and preserves the current ``stage``/``message`` (or
    uses defaults if unset). Does nothing if there are no steps.
    """
    if not self.steps:
        return
    pct = int(round(sum(s.percent for s in self.steps) / len(self.steps)))
    self.set_progress(self.stage or "steps", pct, self.message or "")

save

save(path: Path | str | None = None, *, ensure_dir: bool = True, indent: int = 2, encoding: str = 'utf-8') -> None
Source code in src/pipeline_watcher/core.py
def save(
        self,
        path: Path | str | None = None,
        *,
        ensure_dir: bool = True,
        indent: int = 2,
        encoding: str = "utf-8",
) -> None:
    target = Path(path or self.output_path or "reports/progress.json")
    if ensure_dir:
        target.parent.mkdir(parents=True, exist_ok=True)
    target.write_text(
        self.model_dump_json(
            indent=indent,
        ),
        encoding=encoding,
    )

set_progress

set_progress(stage: str, percent: int, message: str = '') -> None

Update the progress banner and timestamp.

Parameters:

  • stage

    (str) –

    Short stage label (e.g., "discover").

  • percent

    (int) –

    Clamped to 0..100.

  • message

    (str, default: "" ) –

    UI-visible note.

Examples:

>>> report = PipelineReport(...)
>>> report.set_progress("discover", 5, "Scanning directory…")
Source code in src/pipeline_watcher/core.py
def set_progress(self, stage: str, percent: int, message: str = "") -> None:
    """Update the progress banner and timestamp.

    Parameters
    ----------
    stage : str
        Short stage label (e.g., ``"discover"``).
    percent : int
        Clamped to ``0..100``.
    message : str, default ""
        UI-visible note.

    Examples
    --------
    >>> report = PipelineReport(...)
    >>> report.set_progress("discover", 5, "Scanning directory…")
    """
    self.stage = stage
    self.percent = max(0, min(100, percent))
    self.message = message
    self.updated_at = _now()

table_rows_for_files_map

table_rows_for_files_map(expected: Mapping[str, Any]) -> list[dict]

Produce Django/Jinja-friendly summary rows for a files mapping.

Accepts a mapping of {filename_or_id_or_path: other_properties} and returns table rows with normalized status fields for templating.

Parameters:

  • expected

    (Mapping[str, Any]) –

    Keys are identifiers/names/paths; values are attached as other.

Returns:

  • list[dict]

    Rows with fields: - filename : str - seen : bool - status : str ("SUCCESS"|"FAILED"|"SKIPPED"|"RUNNING"|"PENDING"|"MISSING") - percent : int or None - flagged_human_review : bool - human_review_reason : str - file_id : str or None - path : str or None - other : Any (value from expected)

Source code in src/pipeline_watcher/core.py
def table_rows_for_files_map(self, expected: Mapping[str, Any]) -> list[dict]:
    """Produce Django/Jinja-friendly summary rows for a files mapping.

    Accepts a mapping of ``{filename_or_id_or_path: other_properties}``
    and returns table rows with normalized status fields for templating.

    Parameters
    ----------
    expected : Mapping[str, Any]
        Keys are identifiers/names/paths; values are attached as ``other``.

    Returns
    -------
    list[dict]
        Rows with fields:
        - ``filename`` : str
        - ``seen`` : bool
        - ``status`` : str (``"SUCCESS"|"FAILED"|"SKIPPED"|"RUNNING"|"PENDING"|"MISSING"``)
        - ``percent`` : int or None
        - ``flagged_human_review`` : bool
        - ``human_review_reason`` : str
        - ``file_id`` : str or None
        - ``path`` : str or None
        - ``other`` : Any (value from ``expected``)
    """
    def _norm(s: str | None) -> str:
        return "" if s is None else " ".join(str(s).strip().split()).lower()

    def _keys(fr: "FileReport") -> set[str]:
        ks = set()
        if fr.file_id: ks.add(_norm(fr.file_id))
        if fr.name:    ks.add(_norm(fr.name))
        if fr.path:
            p = Path(fr.path)
            ks.add(_norm(str(p)))
            ks.add(_norm(p.name))
        return ks

    # index by all comparable keys (first match wins)
    index: dict[str, "FileReport"] = {}
    for fr in self.files:
        for k in _keys(fr):
            index.setdefault(k, fr)

    rows: list[dict] = []
    for filename, other in expected.items():
        display_name = str(filename)
        key = _norm(display_name)
        fr = index.get(key)

        if fr is None:
            rows.append({
                "filename": display_name,
                "seen": False,
                "status": "MISSING",
                "percent": None,
                "flagged_human_review": False,
                "human_review_reason": "",
                "file_id": None,
                "path": None,
                "other": other,
            })
        else:
            flagged = getattr(fr, "requires_human_review", False)
            reason  = getattr(fr, "human_review_reason", None) or ""
            rows.append({
                "filename": display_name,
                "seen": True,
                "status": fr.status,
                "percent": getattr(fr, "percent", None),
                "flagged_human_review": bool(flagged),
                "human_review_reason": reason,
                "file_id": getattr(fr, "file_id", None),
                "path": getattr(fr, "path", None),
                "other": other,
            })

    return rows

unseen_expected

unseen_expected(expected_iterable) -> list[str]

Return expected filenames/ids/paths that are not present.

Parameters:

  • expected_iterable

    (Iterable[Any]) –

    Filenames/ids/paths expected to appear in :attr:files.

Returns:

  • list[str]

    Items from expected_iterable that were not matched.

Source code in src/pipeline_watcher/core.py
def unseen_expected(self, expected_iterable) -> list[str]:
    """Return expected filenames/ids/paths that are **not** present.

    Parameters
    ----------
    expected_iterable : Iterable[Any]
        Filenames/ids/paths expected to appear in :attr:`files`.

    Returns
    -------
    list[str]
        Items from ``expected_iterable`` that were not matched.
    """
    have = set()
    for fr in self.files:
        have |= _file_keys(fr)
    missing = []
    for x in expected_iterable:
        k = _norm_key(x)
        if k and k not in have:
            missing.append(str(x))
    return missing