Skip to content

Data Pipeline Architecture (dev-health-ops)

Pipeline Overview

The dev-health-ops backend follows a strict unidirectional pipeline:

Connectors → Processors → Sinks → Metrics → Visualization

Each stage has clear responsibilities. Do not collapse layers or bypass stages.


All paths below are relative to src/dev_health_ops/.

1. Connectors (connectors/)

Purpose: Fetch raw data from external providers.

Supported Providers

Provider Module Sync Targets
Local Git connectors/local.py git, blame
GitHub connectors/github.py git, prs, cicd, deployments, incidents, work-items
GitLab connectors/gitlab.py git, prs, cicd, deployments, incidents, work-items
Jira connectors/jira.py work-items
Synthetic connectors/synthetic.py fixtures generation

Rules

  • Network I/O should be async and batch-friendly
  • Respect rate limits and backoff mechanisms
  • Return raw provider data (minimal transformation)
  • Handle pagination completely (never assume single page)

2. Processors (processors/)

Purpose: Normalize and transform connector outputs into internal models.

Key Processor

  • processors/local.py — Primary processor for local git data

Responsibilities

  • Map provider-specific fields to unified models
  • Normalize timestamps to UTC
  • Resolve identities across providers
  • Enrich with computed fields (e.g., commit size buckets)

Rules

  • No network I/O
  • No persistence logic
  • Transform only, no business decisions
  • Output must match models in models/

3. Storage / Sinks (metrics/sinks/)

Purpose: Persist processed data to storage backends.

Supported Backends

Backend Connection Use Case
PostgreSQL postgresql+asyncpg:// Relational, migrations
ClickHouse clickhouse:// Analytics queries
MongoDB mongodb:// Document storage
SQLite sqlite+aiosqlite:// Local dev/test

Rules

  • No file exports. No debug dumps. No JSON/YAML output paths.
  • All persistence goes through sink modules
  • Backend selection via --db flag or DATABASE_URI
  • Secondary sink via SECONDARY_DATABASE_URI with sink='both'

Sink Interface

async def write_batch(records: List[Model], session: AsyncSession) -> int:
    """Write a batch of records. Returns count written."""

4. Metrics (metrics/)

Purpose: Compute higher-level rollups and aggregates from persisted data.

Key Metric Tables

Table Key Content
repo_metrics_daily (repo_id, day) Commits, LOC, PR cycle time
user_metrics_daily (repo_id, author_email, day) User activity
work_item_metrics_daily (day, provider, work_scope_id, team_id) Throughput, WIP, cycle time
team_metrics_daily (team_id, day) After-hours, weekend ratios

Computation Model

  • Metrics are append-only with computed_at versioning
  • Use argMax(<metric>, computed_at) to get latest value
  • Re-computation is safe (idempotent via compound keys)

5. Visualization (dev-health-web)

Purpose: Render persisted data for exploration.

dev-health-web

  • Visualization-only — Must not become source of truth
  • Consumes data via GraphQL API from dev-health-ops
  • No category recomputation at UX time

Backfill Pipeline

Historical backfill reuses the same data pipeline (Connectors -> Processors -> Sinks) but operates differently from incremental sync:

How It Works

  1. Date range splitting -- The BackfillChunker divides the requested date range into 7-day windows
  2. Sequential processing -- Each chunk runs through the standard sync pipeline independently
  3. Progress tracking -- A BackfillJob record in PostgreSQL tracks chunk completion and overall progress

Key Differences from Incremental Sync

Aspect Incremental Sync Backfill
Trigger Scheduled / manual Manual or API-triggered
Date range From watermark to now Explicit --since / --before
Watermarks Updates SyncWatermarks Never updates watermarks
Chunking Single pass 7-day windows
Progress Job run status only Per-chunk progress via BackfillJob
Queue sync backfill (dedicated)

Tier Limits

Backfill depth is gated by organization billing tier:

Tier Max Days
Community 30
Team 90
Enterprise Unlimited

Components

  • backfill/chunker.py -- chunk_date_range() splits date ranges into windows
  • backfill/runner.py -- run_backfill_for_config() orchestrates chunked sync
  • backfill/cli.py -- dev-hops backfill run CLI command
  • workers/tasks.py -- run_backfill Celery task on backfill queue
  • models/backfill.py -- BackfillJob PostgreSQL model for progress tracking
  • api/services/backfill.py -- BackfillJobService async CRUD for API layer

Storage Schema Highlights

ClickHouse Tables

Tables are MergeTree partitioned by toYYYYMM(day):

CREATE TABLE repo_metrics_daily (
    repo_id UUID,
    day Date,
    computed_at DateTime,
    -- metrics columns
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(day)
ORDER BY (repo_id, day);

PostgreSQL Tables

Managed via Alembic migrations in alembic/:

# Generate migration
alembic revision --autogenerate -m "description"

# Apply migrations
alembic upgrade head

Environment Variables

Variable Purpose Required
DATABASE_URI Primary database connection Yes
SECONDARY_DATABASE_URI Secondary sink (with --sink both) No
DB_ECHO Enable SQL logging No
BATCH_SIZE Records per batch insert No (default: 100)
MAX_WORKERS Parallel workers No (default: 4)

Adding New Pipeline Components

New Connector

  1. Create connectors/newprovider.py
  2. Implement async fetch methods
  3. Register in connectors/__init__.py
  4. Add CLI integration in cli.py

New Metric

  1. Define model in models/
  2. Add sink in metrics/sinks/
  3. Implement computation in metrics/
  4. Create Alembic migration if using Postgres
  5. Update dev-health-web or OTLP dashboards as needed

Rules When Modifying

  • Never bypass sinks for persistence
  • Always handle pagination
  • Add tests under tests/
  • Respect existing async patterns