Data Pipeline Architecture (dev-health-ops)¶
Pipeline Overview¶
The dev-health-ops backend follows a strict unidirectional pipeline:
Connectors → Processors → Sinks → Metrics → Visualization
Each stage has clear responsibilities. Do not collapse layers or bypass stages.
1. Connectors (connectors/)¶
Purpose: Fetch raw data from external providers.
Supported Providers¶
| Provider | Module | Sync Targets |
|---|---|---|
| Local Git | connectors/local.py |
git, blame |
| GitHub | connectors/github.py |
git, prs, cicd, deployments, incidents, work-items |
| GitLab | connectors/gitlab.py |
git, prs, cicd, deployments, incidents, work-items |
| Jira | connectors/jira.py |
work-items |
| Synthetic | connectors/synthetic.py |
fixtures generation |
Rules¶
- Network I/O should be async and batch-friendly
- Respect rate limits and backoff mechanisms
- Return raw provider data (minimal transformation)
- Handle pagination completely (never assume single page)
2. Processors (processors/)¶
Purpose: Normalize and transform connector outputs into internal models.
Key Processor¶
processors/local.py— Primary processor for local git data
Responsibilities¶
- Map provider-specific fields to unified models
- Normalize timestamps to UTC
- Resolve identities across providers
- Enrich with computed fields (e.g., commit size buckets)
Rules¶
- No network I/O
- No persistence logic
- Transform only, no business decisions
- Output must match models in
models/
3. Storage / Sinks (metrics/sinks/)¶
Purpose: Persist processed data to storage backends.
Supported Backends¶
| Backend | Connection | Use Case |
|---|---|---|
| PostgreSQL | postgresql+asyncpg:// |
Relational, migrations |
| ClickHouse | clickhouse:// |
Analytics queries |
| MongoDB | mongodb:// |
Document storage |
| SQLite | sqlite+aiosqlite:// |
Local dev/test |
Rules¶
- No file exports. No debug dumps. No JSON/YAML output paths.
- All persistence goes through sink modules
- Backend selection via
--dbflag orDATABASE_URI - Secondary sink via
SECONDARY_DATABASE_URIwithsink='both'
Sink Interface¶
async def write_batch(records: List[Model], session: AsyncSession) -> int:
"""Write a batch of records. Returns count written."""
4. Metrics (metrics/)¶
Purpose: Compute higher-level rollups and aggregates from persisted data.
Key Metric Tables¶
| Table | Key | Content |
|---|---|---|
repo_metrics_daily |
(repo_id, day) |
Commits, LOC, PR cycle time |
user_metrics_daily |
(repo_id, author_email, day) |
User activity |
work_item_metrics_daily |
(day, provider, work_scope_id, team_id) |
Throughput, WIP, cycle time |
team_metrics_daily |
(team_id, day) |
After-hours, weekend ratios |
Computation Model¶
- Metrics are append-only with
computed_atversioning - Use
argMax(<metric>, computed_at)to get latest value - Re-computation is safe (idempotent via compound keys)
5. Visualization (Grafana + dev-health-web)¶
Purpose: Render persisted data for exploration.
Grafana¶
- Dashboards provisioned via
grafana/directory - Query conventions:
- Prefer table format with stable time ordering
- Handle
team_idnull/empty normalization - Avoid ClickHouse
WITH name = exprsyntax; useWITH ... AS
dev-health-web¶
- Visualization-only — Must not become source of truth
- Consumes data via GraphQL API from dev-health-ops
- No category recomputation at UX time
Storage Schema Highlights¶
ClickHouse Tables¶
Tables are MergeTree partitioned by toYYYYMM(day):
CREATE TABLE repo_metrics_daily (
repo_id UUID,
day Date,
computed_at DateTime,
-- metrics columns
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(day)
ORDER BY (repo_id, day);
PostgreSQL Tables¶
Managed via Alembic migrations in alembic/:
# Generate migration
alembic revision --autogenerate -m "description"
# Apply migrations
alembic upgrade head
Environment Variables¶
| Variable | Purpose | Required |
|---|---|---|
DATABASE_URI |
Primary database connection | Yes |
SECONDARY_DATABASE_URI |
Secondary sink (with --sink both) |
No |
DB_ECHO |
Enable SQL logging | No |
BATCH_SIZE |
Records per batch insert | No (default: 100) |
MAX_WORKERS |
Parallel workers | No (default: 4) |
Adding New Pipeline Components¶
New Connector¶
- Create
connectors/newprovider.py - Implement async fetch methods
- Register in
connectors/__init__.py - Add CLI integration in
cli.py
New Metric¶
- Define model in
models/ - Add sink in
metrics/sinks/ - Implement computation in
metrics/ - Create Alembic migration if using Postgres
- Update Grafana dashboards
Rules When Modifying¶
- Never bypass sinks for persistence
- Always handle pagination
- Add tests under
tests/ - Respect existing async patterns