Skip to content

Data Pipeline Architecture (dev-health-ops)

Pipeline Overview

The dev-health-ops backend follows a strict unidirectional pipeline:

Connectors → Processors → Sinks → Metrics → Visualization

Each stage has clear responsibilities. Do not collapse layers or bypass stages.


1. Connectors (connectors/)

Purpose: Fetch raw data from external providers.

Supported Providers

Provider Module Sync Targets
Local Git connectors/local.py git, blame
GitHub connectors/github.py git, prs, cicd, deployments, incidents, work-items
GitLab connectors/gitlab.py git, prs, cicd, deployments, incidents, work-items
Jira connectors/jira.py work-items
Synthetic connectors/synthetic.py fixtures generation

Rules

  • Network I/O should be async and batch-friendly
  • Respect rate limits and backoff mechanisms
  • Return raw provider data (minimal transformation)
  • Handle pagination completely (never assume single page)

2. Processors (processors/)

Purpose: Normalize and transform connector outputs into internal models.

Key Processor

  • processors/local.py — Primary processor for local git data

Responsibilities

  • Map provider-specific fields to unified models
  • Normalize timestamps to UTC
  • Resolve identities across providers
  • Enrich with computed fields (e.g., commit size buckets)

Rules

  • No network I/O
  • No persistence logic
  • Transform only, no business decisions
  • Output must match models in models/

3. Storage / Sinks (metrics/sinks/)

Purpose: Persist processed data to storage backends.

Supported Backends

Backend Connection Use Case
PostgreSQL postgresql+asyncpg:// Relational, migrations
ClickHouse clickhouse:// Analytics queries
MongoDB mongodb:// Document storage
SQLite sqlite+aiosqlite:// Local dev/test

Rules

  • No file exports. No debug dumps. No JSON/YAML output paths.
  • All persistence goes through sink modules
  • Backend selection via --db flag or DATABASE_URI
  • Secondary sink via SECONDARY_DATABASE_URI with sink='both'

Sink Interface

async def write_batch(records: List[Model], session: AsyncSession) -> int:
    """Write a batch of records. Returns count written."""

4. Metrics (metrics/)

Purpose: Compute higher-level rollups and aggregates from persisted data.

Key Metric Tables

Table Key Content
repo_metrics_daily (repo_id, day) Commits, LOC, PR cycle time
user_metrics_daily (repo_id, author_email, day) User activity
work_item_metrics_daily (day, provider, work_scope_id, team_id) Throughput, WIP, cycle time
team_metrics_daily (team_id, day) After-hours, weekend ratios

Computation Model

  • Metrics are append-only with computed_at versioning
  • Use argMax(<metric>, computed_at) to get latest value
  • Re-computation is safe (idempotent via compound keys)

5. Visualization (Grafana + dev-health-web)

Purpose: Render persisted data for exploration.

Grafana

  • Dashboards provisioned via grafana/ directory
  • Query conventions:
  • Prefer table format with stable time ordering
  • Handle team_id null/empty normalization
  • Avoid ClickHouse WITH name = expr syntax; use WITH ... AS

dev-health-web

  • Visualization-only — Must not become source of truth
  • Consumes data via GraphQL API from dev-health-ops
  • No category recomputation at UX time

Storage Schema Highlights

ClickHouse Tables

Tables are MergeTree partitioned by toYYYYMM(day):

CREATE TABLE repo_metrics_daily (
    repo_id UUID,
    day Date,
    computed_at DateTime,
    -- metrics columns
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(day)
ORDER BY (repo_id, day);

PostgreSQL Tables

Managed via Alembic migrations in alembic/:

# Generate migration
alembic revision --autogenerate -m "description"

# Apply migrations
alembic upgrade head

Environment Variables

Variable Purpose Required
DATABASE_URI Primary database connection Yes
SECONDARY_DATABASE_URI Secondary sink (with --sink both) No
DB_ECHO Enable SQL logging No
BATCH_SIZE Records per batch insert No (default: 100)
MAX_WORKERS Parallel workers No (default: 4)

Adding New Pipeline Components

New Connector

  1. Create connectors/newprovider.py
  2. Implement async fetch methods
  3. Register in connectors/__init__.py
  4. Add CLI integration in cli.py

New Metric

  1. Define model in models/
  2. Add sink in metrics/sinks/
  3. Implement computation in metrics/
  4. Create Alembic migration if using Postgres
  5. Update Grafana dashboards

Rules When Modifying

  • Never bypass sinks for persistence
  • Always handle pagination
  • Add tests under tests/
  • Respect existing async patterns