When autonomous agents generate insurance software, the data flowing through those systems touches policyholder PII, claims records, financial transactions, and protected health information. Without a dedicated data plane — lineage tracking, quality validation, pipeline observability, continuous classification, and source-to-target reconciliation — there is no defensible answer when a regulator asks where a data element came from, whether it was accurate, or whether calculations are mathematically consistent across systems. The Agentic Data Plane closes that gap, providing a governance-grade data infrastructure layer that operates beneath every agent-generated system.
Why autonomous agents building insurance software require a dedicated data governance layer — and what breaks without one.
Agents generate code that transforms data — enriching quotes with bureau data, mapping claims fields, aggregating policy records. Each transformation moves data further from its source. Without lineage tracking, a state DOI examiner asking "where did this loss ratio figure come from?" receives a shrug. No DAG. No audit trail. No defensible answer.
Data quality issues in agent-generated systems accumulate silently. A null rate class propagates into a pricing model. A stale bureau score feeds a risk tier. Without automated quality validation built into the pipeline, degraded data moves to production undetected — only surfacing when a claim adjudication fails or a regulatory filing is rejected.
When agents modify upstream systems, downstream consumers have no visibility into what changed or when. A schema alteration in a policy administration system silently breaks a reporting pipeline. Pipeline observability — real-time monitoring of data flows with anomaly detection — is the only way to catch these breaks before they become compliance failures.
Classification at rest is insufficient when agents constantly generate new data structures. A new claims table may contain PHI the classification system never saw. A synthetic test dataset may inadvertently include real policyholder names. Continuous classification — scanning data in motion — is required to maintain HIPAA, state DOI, and NAIC compliance posture as systems evolve.
When an agent transforms premium calculations from a rating engine into a policy administration system, can you prove the numbers match? Financial auditors require mathematical proof that calculations in the upstream system produce identical results in the target — not just that the data arrived, but that every subtotal, aggregation, and derived value is consistent end-to-end. Without source-to-target reconciliation, a single rounding error or silent truncation becomes an audit finding.
The core problem: Insurance carriers operating under NAIC model laws, state DOI regulations, and HIPAA must be able to demonstrate data provenance, quality, classification, and calculation integrity for every system they deploy. When those systems are agent-generated and evolving continuously, a static, periodic approach to data governance fails. The data plane must be continuous, automated, and integrated — not a quarterly audit exercise.
Six integrated components forming a complete data governance plane for agent-generated insurance systems.
DAG-based provenance tracking for every data transformation from source to output, with immutable event records.
Automated scoring across completeness, accuracy, consistency, and timeliness dimensions with configurable thresholds.
Real-time monitoring of data flows with anomaly detection, schema drift alerts, and SLA tracking.
In-motion PII, PHI, and regulated data detection using ML classifiers tuned for insurance data types.
Visual DAG explorer tracing any output field through every transformation back to its origin source.
Source-to-target calculation proof with SHA-256 manifests, deterministic replay, balance reconciliation, and auditor-ready reports.
// Agentic Data Plane — Core Pipeline Flow
1. DATA INGESTION EVENT
source: PolicyAdminSystem | ClaimsSystem | BureauFeed | AgentOutput
├── emit: LineageEvent { event_id, source_id, schema_hash, timestamp }
├── trigger: ClassificationEngine.scan(data_stream)
└── publish: audit_bus.lineage_ingestion
2. CLASSIFICATION ENGINE (in-motion scan)
for each field in data_stream:
├── run: PIIDetector.classify(field) → NONE | PII | PHI | FINANCIAL
├── run: InsuranceClassifier.classify(field) → PUBLIC | INTERNAL | CONFIDENTIAL | RESTRICTED
├── if tier == RESTRICTED:
│ ├── emit: GovernanceAlert { severity: HIGH, field, detected_type }
│ └── publish: audit_bus.classification_violation
└── persist: ClassificationRecord { field_path, tier, confidence, timestamp }
3. LINEAGE ENGINE (transformation tracking)
on: AgentCodeExecution | DataTransformation | PipelineStep
├── record: LineageNode { node_id, operation, inputs[], outputs[], agent_id }
├── link: LineageEdge { from: input_node_id, to: output_node_id, transform_fn }
├── update: LineageDAG.add(node, edges)
└── publish: audit_bus.lineage_transform
4. QUALITY VALIDATION PIPELINE
for each dataset at checkpoint:
├── score: Completeness = (non_null_fields / total_fields) * 250
├── score: Accuracy = (validated_values / total_values) * 250
├── score: Consistency = (schema_conforming / total_records) * 250
├── score: Timeliness = freshness_factor(data_age, sla_hours) * 250
├── total_score = sum(dimension_scores) // 0–1000
├── if total_score < threshold:
│ ├── emit: QualityAlert { dataset_id, score, failing_dimensions }
│ └── publish: audit_bus.quality_violation
└── persist: QualityRecord { dataset_id, score, dimensions, timestamp }
5. PIPELINE OBSERVABILITY
monitor: all_active_pipelines
├── check: schema_drift(current_schema, baseline_schema)
├── check: volume_anomaly(record_count, expected_range)
├── check: latency_sla(processing_time, max_allowed_ms)
├── check: null_rate_spike(null_pct, baseline_null_pct)
├── if anomaly_detected:
│ ├── emit: PipelineAlert { pipeline_id, anomaly_type, severity }
│ └── notify: downstream_consumers[]
└── publish: audit_bus.pipeline_health
6. LINEAGE QUERY API
query: LineageAPI.trace(output_field_id)
├── traverse: LineageDAG backward from output_field_id
├── collect: all ancestor nodes, edges, transforms
├── resolve: source_systems[], agent_ids[], timestamps[]
└── return: LineageReport { path[], sources[], transforms[], duration_ms }
7. FINANCIAL RECONCILIATION
trigger: pipeline_run.complete
├── extract_stage: compare(source_count, extracted_count, sha256(primary_keys))
├── transform_stage: for each calculation:
│ ├── capture: { inputs, formula_ref, outputs } → ComputationRecord
│ └── verify: replay(inputs, formula) === stored_output
├── aggregate_stage: recompute(control_totals_from_details) === stored_aggregates
│ └── tolerance: exact(counts), ±$0.01(currency), configurable(derived)
├── load_stage: compare(staged_hash, committed_hash)
├── end_to_end: compare(source_grand_totals, target_grand_totals)
├── if break_detected:
│ ├── emit: ReconciliationBreak { break_id, stage, field, expected, actual, variance }
│ ├── block: downstream_consumption
│ └── route: break_resolution_workflow
├── generate: ReconciliationManifest { verdict, stages[], breaks[], signature }
└── publish: audit_bus.data_reconciliation
8. GOVERNANCE AUDIT BUS INTEGRATION
subscribe: governance_framework.audit_bus
├── forward: lineage events → audit_trail.data_lineage
├── forward: quality alerts → audit_trail.data_quality
├── forward: classification → audit_trail.data_classification
├── forward: reconciliation → audit_trail.data_reconciliation
└── forward: pipeline health → audit_trail.pipeline_observability
Design principle: Every component publishes to the same governance audit bus used by the broader framework. Lineage events, quality alerts, classification records, reconciliation manifests, and pipeline health metrics are all first-class citizens in the audit trail — not siloed data plane concerns. A regulator pulling an audit trail sees data provenance and code provenance in the same chronological stream.
Detailed specifications for each component, including data classification tiers, quality scoring, lineage event types, and governance integration points.
| Tier | Label | Insurance Data Examples | Regulatory Framework | Handling Requirements |
|---|---|---|---|---|
| T0 | Public | Published rate filings, product brochures, aggregated industry statistics | None specific | No restrictions; may be shared freely |
| T1 | Internal | Underwriting guidelines, internal loss ratios, agent performance metrics | Trade secret protection | Access limited to employees; not for external sharing |
| T2 | Confidential | Policyholder name, address, policy number, premium data, claims history | Gramm-Leach-Bliley (GLBA), state DOI regulations, NAIC Privacy Model | Encryption at rest and in transit; access logging; data minimization |
| T3 | Restricted | SSN, DOB, medical diagnoses, treatment records, financial account numbers, claims PHI | HIPAA, ACA, state insurance codes, NAIC Health Information Model Act | Maximum protection; field-level encryption; audit on every access; breach notification required |
| Dimension | Weight | Measurement Method | Insurance-Specific Checks | Alert Threshold |
|---|---|---|---|---|
| Completeness | 250 pts | Non-null required fields / total required fields | Required claims fields, mandatory policy fields per state form filing | < 200 pts triggers alert |
| Accuracy | 250 pts | Validated values against reference data / total values | NAIC codes, ISO classification codes, VIN validation, ZIP+4 verification | < 175 pts triggers alert |
| Consistency | 250 pts | Schema-conforming records / total records; cross-field rule checks | Effective date before expiry date, premium > 0 for bound policies, deductible ≤ limit | < 200 pts triggers alert |
| Timeliness | 250 pts | Freshness factor based on data age vs. configured SLA | Bureau data < 90 days, claims status < 24 hours, payment data < 1 hour | < 150 pts triggers alert |
| Total | 0–1000 point scale. Score < 700 blocks pipeline promotion to production. | |||
| Event Type | Trigger | Payload Fields | Audit Bus Topic |
|---|---|---|---|
| LINEAGE_SOURCE | Data first enters the system from an external source | source_id, source_type, schema_hash, record_count, timestamp | audit.data.lineage.source |
| LINEAGE_TRANSFORM | Agent or pipeline applies a transformation to data | node_id, agent_id, operation, input_ids[], output_ids[], transform_fn, duration_ms | audit.data.lineage.transform |
| LINEAGE_MERGE | Two or more data streams are joined or merged | node_id, input_ids[], join_keys[], output_id, merge_strategy | audit.data.lineage.merge |
| LINEAGE_OUTPUT | Data is written to a destination system or exposed via API | node_id, destination_type, destination_id, field_map, consumer_id | audit.data.lineage.output |
| LINEAGE_DELETE | Data is deleted or anonymized (GLBA/CCPA compliance) | node_id, reason, regulation_reference, authorized_by, deletion_scope | audit.data.lineage.delete |
| CLASSIFICATION_DETECTED | Classification engine identifies regulated data in motion | field_path, detected_tier, detected_type, confidence, pipeline_id | audit.data.classification |
| QUALITY_SCORED | Quality validation pipeline completes a scoring run | dataset_id, total_score, dimension_scores{}, failing_checks[], pipeline_id | audit.data.quality |
| PIPELINE_ANOMALY | Observability layer detects schema drift, volume anomaly, or SLA breach | pipeline_id, anomaly_type, severity, expected_value, actual_value, affected_consumers[] | audit.data.pipeline |
For regulated industries — insurance, banking, healthcare — it is not enough to track that data moved through a pipeline. Auditors require proof that calculations executed in the source system produce identical results in the target system and that mathematical integrity is preserved at every transformation stage. The reconciliation engine provides this proof.
Every pipeline run produces a reconciliation manifest: a signed document containing input record counts, output record counts, and deterministic hash digests (SHA-256) of the calculation inputs and outputs at each stage. The manifest is compared against the target system's independently computed values. Any mismatch — even a single cent — is flagged as a reconciliation break and blocks downstream consumption until resolved.
The engine captures every calculation's inputs, formula/logic reference, and outputs as a reproducible computation record. Auditors can select any record and trigger a replay: the engine re-executes the calculation using the captured inputs and the current formula version, then compares the result against the stored output. Drift between formula versions is surfaced with side-by-side comparison.
At every aggregation boundary (subtotals, group-by rollups, grand totals), the engine independently computes control totals from the underlying detail records and compares them against the aggregated values. This catches truncation errors, floating-point drift, rounding policy inconsistencies, and silent data loss from filtered records. Tolerances are configurable per field (e.g., ±$0.01 for currency, exact match for record counts).
Reconciliation results are published as structured reports in PDF and machine-readable JSON. Each report contains: pipeline run ID, timestamp, source system identifier, target system identifier, record counts (matched/unmatched/orphaned), control total comparisons, hash verification status, and a reconciliation verdict (PASS / PASS WITH EXCEPTIONS / FAIL). Exception details include the specific records and fields that broke, with full lineage trace back to source.
| Stage | What Is Compared | Proof Method |
|---|---|---|
| Extraction | Source record count vs extracted count | Row count + SHA-256 of sort-ordered primary keys |
| Transformation | Input values + formula → output values | Deterministic replay with captured inputs |
| Aggregation | Sum of details vs reported totals | Independent control total recomputation |
| Loading | Staged records vs committed records | Row count + hash digest of target table snapshot |
| End-to-End | Source grand totals vs target grand totals | Cross-system balance comparison with tolerance |
All eight lineage event types are published to the governance framework's central audit bus in real time. Events are signed with the originating agent's identity token and cannot be modified after publication. The audit bus maintains a cryptographic hash chain across events, providing tamper evidence for regulatory review.
The governance dashboard, compliance reporting module, and risk assessment engine all subscribe to data plane events. Classification violations trigger immediate alerts to the compliance officer workflow. Quality score drops below the blocking threshold pause pipeline promotion and notify the responsible agent orchestrator.
Twenty-four functional and non-functional requirements governing the Agentic Data Plane, numbered in REQ-DATA format.
Every data transformation executed by an agent — including schema mapping, field derivation, aggregation, and enrichment — must emit a LINEAGE_TRANSFORM event before the transformation completes. Transformations without a corresponding lineage event must not be applied to production data.
The lineage engine must maintain a complete, unbroken directed acyclic graph from every output field back to its source. A lineage query against any output field must resolve to at least one source system within 500ms for datasets up to 10 million records.
The quality validation pipeline must compute a 0–1000 point score for each dataset using four weighted dimensions: completeness, accuracy, consistency, and timeliness. Threshold values for alerts and pipeline blocking must be configurable per dataset, pipeline, and environment without code changes.
Any dataset scoring below the configured blocking threshold (default: 700/1000) must be prevented from promotion to production environments. The blocking decision must be logged to the governance audit bus as a QUALITY_SCORED event with the specific failing dimensions identified.
The continuous classification engine must scan all data in motion for PII and PHI at a minimum detection rate of 98.5% for the regulated field types defined in the insurance data type registry. Detection latency must not exceed 150ms per record, ensuring classification does not become a pipeline bottleneck.
The classification engine must include pre-trained classifiers for insurance-specific regulated data types: HIPAA-covered PHI fields, state DOI-regulated policyholder data, NAIC-defined financial data elements, and PCI-DSS payment card data. Classifiers must be versioned and updatable without pipeline downtime.
The pipeline observability layer must detect and alert on four anomaly classes: schema drift (field additions, removals, type changes), volume anomalies (>2σ deviation from baseline), null rate spikes (>10% increase from baseline), and SLA breaches (processing time exceeds configured maximum). Alerts must reach subscribers within 30 seconds of detection.
When schema drift is detected in an upstream pipeline, all registered downstream consumers must receive a structured notification including the pipeline ID, the nature of the change, the affected fields, and the detected-at timestamp. Consumers must acknowledge notifications within a configurable window or be flagged for manual review.
A REST and GraphQL API must expose lineage queries against the data plane. Minimum required operations: trace all inputs for a given output field; list all outputs derived from a given source; retrieve the transformation history for a dataset; and query lineage events within a time range. All API responses must include a query execution time in the response headers.
All data plane events — lineage, quality, classification, and pipeline health — must be published to the governance framework's central audit bus using the shared event schema. The data plane must not maintain a separate audit trail; the governance audit bus is the single source of truth for all data governance events.
A browser-based dashboard must render the lineage DAG as an interactive directed graph, supporting node expansion, edge traversal, and field-level drill-down. The dashboard must support search by output field name, dataset ID, and time range. The full lineage graph for a 1,000-node DAG must render in under 3 seconds.
Lineage metadata must be retained for a minimum of seven years to satisfy state insurance records retention requirements (reference: NAIC Model Records Retention Act). Retention policies must be configurable per data classification tier, with Restricted-tier lineage records subject to extended retention of ten years. Deletion must require dual-authorization and generate a LINEAGE_DELETE event.
The system must track quality scores over time per dataset and pipeline, computing rolling 7-day and 30-day trend lines. Datasets with declining quality trends — defined as a >5% score decrease over 30 days — must generate a proactive quality debt alert before the blocking threshold is reached, providing early warning for remediation.
The classification engine must report a confidence score (0.0–1.0) for each classification decision. Classifications with confidence below 0.75 must be flagged for human review rather than automatically enforced. A governance officer must be able to override a classification decision with a documented rationale, which is recorded to the audit bus.
Every lineage event emitted as the result of an agent action must include the agent's identity token and session ID. This binds data transformations to the specific agent instance and session that performed them, enabling forensic reconstruction of which agent — and which agent version — touched which data.
The data plane must support on-demand generation of structured regulatory reports including: complete data lineage for a policy record (for state DOI examination), PHI handling audit trail (for HIPAA compliance review), and data quality certification report (for rate filing submissions). Reports must be exportable in PDF and machine-readable JSON formats.
Every pipeline run SHALL produce a signed reconciliation manifest containing: input record count, output record count, SHA-256 hash digests of calculation inputs and outputs at each transformation stage, and a reconciliation verdict (PASS / PASS WITH EXCEPTIONS / FAIL). Any record count or hash mismatch SHALL block downstream consumption until the break is resolved and re-reconciled.
Every calculation executed in the pipeline SHALL capture its inputs, formula/logic reference (versioned), and outputs as a reproducible computation record. Auditors SHALL be able to trigger a replay of any record: the engine re-executes the calculation using the captured inputs and compares the result against the stored output. Formula version drift SHALL be surfaced with side-by-side comparison of old and current formula outputs.
At every aggregation boundary (subtotals, group-by rollups, grand totals), the reconciliation engine SHALL independently recompute control totals from the underlying detail records and compare against the stored aggregated values. Tolerance thresholds SHALL be configurable per field type: exact match for record counts, configurable currency tolerance (default ±$0.01) for monetary fields, and configurable percentage tolerance for derived metrics.
The reconciliation engine SHALL compare grand totals between the source system and the target system for every pipeline run. The comparison SHALL use independently computed values from each system (not passed-through totals) to detect silent data loss, truncation errors, floating-point drift, and rounding policy inconsistencies. Breaks SHALL include the specific field, expected value, actual value, and variance amount.
When a reconciliation break is detected, the engine SHALL create a structured break record containing: break ID, pipeline run ID, stage, field, expected value, actual value, variance, severity (blocking/warning), and full lineage trace from source to the point of divergence. Breaks SHALL be routable to the responsible agent orchestrator or human reviewer. Resolution (with root cause) SHALL be recorded and linked to the break record.
The engine SHALL maintain point-in-time snapshots of reconciliation state, enabling auditors to answer: "As of date X, what was the reconciliation status of pipeline Y?" Snapshots SHALL be immutable after creation and SHALL include the full reconciliation manifest, control totals, and break status. Retention SHALL align with regulatory requirements (minimum 7 years for insurance, 5 years for SOX).
Reconciliation results SHALL be exportable as structured reports in PDF (human-readable) and JSON (machine-readable). Each report SHALL contain: pipeline run ID, timestamp, source and target system identifiers, record counts (matched/unmatched/orphaned), control total comparisons per aggregation level, hash verification status per stage, reconciliation verdict, and exception details with lineage traces. Reports SHALL be signable with the same Ed25519 attestation used by the Code Hardener.
The reconciliation engine SHALL support continuous mode: every pipeline execution automatically reconciles and publishes results to the governance audit bus. Reconciliation failures SHALL trigger alerts configurable by severity. A reconciliation dashboard SHALL display current status, historical trends, break frequency by pipeline/stage, and mean time to resolution. SLA: reconciliation of a 1M-record pipeline SHALL complete within 60 seconds.
A complete Claude Code prompt to implement the Agentic Data Plane from scratch.
Build a production-ready Agentic Data Plane for insurance software systems generated by autonomous AI agents. This is a data governance infrastructure layer that runs beneath agent-generated systems, providing lineage tracking, quality validation, pipeline observability, continuous data classification, and financial audit-grade source-to-target reconciliation. Target environment: Node.js 20+ backend with TypeScript, PostgreSQL for lineage storage, Redis for real-time event streaming, and a React + D3.js lineage dashboard.
## Core Components to Build
### 1. Data Lineage Engine
Create a lineage tracking service at `src/lineage/LineageEngine.ts`:
- Implement a directed acyclic graph (DAG) data structure for data provenance using an adjacency list with `LineageNode` and `LineageEdge` types
- `LineageNode`: { node_id: string, operation: string, agent_id: string, session_id: string, inputs: string[], outputs: string[], transform_fn: string, schema_hash: string, timestamp: ISO8601, metadata: Record }
- `LineageEdge`: { from: string, to: string, field_map: Record, transform_applied: string }
- Implement `LineageEngine.record(event: LineageEvent): Promise` — appends to both in-memory DAG and PostgreSQL persistence
- Implement `LineageEngine.trace(output_field_id: string): Promise` — backward traversal from output to all source nodes, returning the complete path with < 500ms SLA
- Implement `LineageEngine.query(filters: LineageQueryFilters): Promise` — supports filtering by time range, agent_id, source_system, and output_destination
- Persist all lineage nodes and edges in PostgreSQL tables `lineage_nodes` and `lineage_edges` with appropriate indexes on node_id, agent_id, and timestamp
- Emit all lineage events to the governance audit bus (Redis Streams topic `audit:data:lineage`)
- Enforce 7-year retention for standard data, 10-year for Restricted-tier; implement `LineageEngine.purge(before: Date, tier: DataTier)` with dual-authorization requirement (two officer tokens required)
### 2. Quality Validation Pipeline
Create `src/quality/QualityValidator.ts`:
- Implement four scoring dimensions, each 0–250 points, summing to a 0–1000 total:
- Completeness: count non-null required fields / total required fields, weighted by field criticality
- Accuracy: validate values against reference datasets (NAIC codes, ISO codes, VIN patterns, ZIP+4 format)
- Consistency: check cross-field business rules (effective_date < expiry_date, premium > 0, deductible <= limit, valid state/LOB combinations)
- Timeliness: freshness_score = max(0, 250 * (1 - data_age_hours / sla_hours))
- Implement `QualityValidator.score(dataset: Dataset, config: QualityConfig): Promise`
- `QualityScore`: { dataset_id, total_score, completeness, accuracy, consistency, timeliness, failing_checks: FailingCheck[], timestamp }
- Implement threshold enforcement: if total_score < config.blocking_threshold (default 700), reject promotion with `QualityBlockError` and publish `audit:data:quality` event
- Implement trend tracking: store score history in PostgreSQL `quality_scores` table; compute 7-day and 30-day rolling averages; emit `QUALITY_DEBT_ALERT` when 30-day trend shows > 5% decline before blocking threshold is reached
- Expose `QualityValidator.report(dataset_id: string, from: Date, to: Date): Promise` for regulatory report generation
### 3. Pipeline Observability Layer
Create `src/observability/PipelineObserver.ts`:
- Register pipelines with `PipelineObserver.register(pipeline: PipelineDefinition)` — stores baseline schema, expected volume range, null rate baselines, and SLA config
- Implement real-time monitoring via `PipelineObserver.observe(pipeline_id: string, batch: DataBatch): Promise`:
- Schema drift: deep-compare current schema to baseline; detect field additions, removals, type changes, and required/optional status changes
- Volume anomaly: compare batch record count to rolling 30-day baseline; flag if deviation > 2σ
- Null rate spike: compute per-field null rates; flag any field with > 10% increase from baseline
- SLA breach: measure processing time; flag if exceeds `pipeline.max_processing_ms`
- On anomaly detection: emit `PipelineAlert` to Redis Streams topic `audit:data:pipeline`; notify all registered downstream consumers via webhook with retry (3 attempts, exponential backoff)
- Consumers acknowledge via `PipelineObserver.acknowledge(pipeline_id: string, consumer_id: string, alert_id: string)` — unacknowledged alerts after configurable TTL escalate to manual review queue
### 4. Continuous Classification Engine
Create `src/classification/ClassificationEngine.ts`:
- Implement field-level classification scanning for data in motion
- Build pattern-matching + ML hybrid classifiers for:
- PII: SSN (XXX-XX-XXXX pattern + Luhn-variant check), DOB, name+address combos, email, phone
- PHI (HIPAA): medical record numbers, diagnosis codes (ICD-10 format), treatment dates co-located with patient identifiers, provider NPI numbers
- Financial: payment card numbers (Luhn validation), bank account numbers, routing numbers
- Insurance-specific NAIC fields: NAIC company codes, FEIN/TIN in insurance context, adjuster license numbers, DOI filing reference numbers
- Each classifier returns `ClassificationResult { field_path, detected_tier: DataTier, detected_type: string, confidence: number (0.0-1.0), evidence: string[] }`
- Classifications with confidence < 0.75 are flagged `NEEDS_REVIEW` rather than auto-enforced
- Implement `ClassificationEngine.scan(data_stream: AsyncIterable): AsyncIterable` — streaming API, max 150ms latency per record
- Publish all classification results to `audit:data:classification` Redis Stream
- Implement override API: `ClassificationEngine.override(result_id: string, new_tier: DataTier, rationale: string, officer_token: string)` — records override to audit bus with officer identity
### 5. Lineage Query REST + GraphQL API
Create `src/api/LineageAPI.ts` using Express + Apollo Server:
REST endpoints:
- `GET /api/v1/lineage/trace/:output_field_id` — full backward trace to sources
- `GET /api/v1/lineage/sources/:source_id/outputs` — all outputs derived from a source
- `GET /api/v1/lineage/datasets/:dataset_id/history` — transformation history
- `GET /api/v1/lineage/events?from=&to=&agent_id=&pipeline_id=` — time-range event query
- `GET /api/v1/lineage/report/:policy_id` — regulatory lineage report for a policy record
- `GET /api/v1/quality/datasets/:dataset_id/trend?days=30` — quality trend report
- `POST /api/v1/reports/regulatory` — generate formatted regulatory report (PDF/JSON)
GraphQL schema: expose `lineageTrace`, `lineageSearch`, `qualityScore`, `classificationHistory`, `pipelineHealth` queries with full type definitions.
All API responses must include `X-Query-Time-Ms` header. Authentication via JWT with role-based access (viewer, analyst, compliance_officer, admin).
### 6. Lineage Dashboard
Create `src/dashboard/` as a React + TypeScript SPA:
- Use D3.js for DAG visualization; render nodes as rectangles with color-coded tiers (T0=gray, T1=blue, T2=orange, T3=red)
- Support interactive features: node click to expand/collapse; edge hover to show transform details; field-level drill-down panel
- Search bar: query by output field name, dataset ID, time range; results highlight matching subgraph
- Performance: 1,000-node DAG must render in < 3 seconds using WebWorker for layout computation
- Quality panel: show sparkline of 30-day quality trend per selected dataset
- Pipeline health panel: show last 24h of pipeline observations with anomaly indicators
- Classification coverage: show heatmap of classification tier distribution across active pipelines
### 7. Governance Audit Bus Integration
Create `src/audit/AuditBusClient.ts`:
- Wrap Redis Streams with typed publish methods for each event type
- Implement `AuditBusClient.publish(event: AuditEvent): Promise` with retry logic (3 attempts) and dead-letter queue for failed publishes
- Sign all events with HMAC-SHA256 using the service's private key before publishing — consumers can verify integrity
- Implement hash-chained event IDs: each event's ID incorporates a hash of the previous event in the stream, providing tamper evidence
- Subscribe methods: `AuditBusClient.subscribe(topic: string, handler: EventHandler): () => void` — returns unsubscribe function
### 8. Financial Reconciliation Engine
Create `src/reconciliation/ReconciliationEngine.ts`:
- Implement `ReconciliationEngine.reconcile(pipelineRunId: string): Promise<ReconciliationManifest>` — compares source and target at every stage
- At extraction: compare source record count vs extracted count, SHA-256 of sort-ordered primary keys
- At transformation: capture inputs + formula reference + outputs as `ComputationRecord`, support deterministic replay via `ReconciliationEngine.replay(recordId: string)`
- At aggregation: independently recompute control totals from detail records, compare against stored aggregates with configurable tolerance (exact for counts, ±$0.01 for currency)
- At loading: compare staged record count/hash vs committed count/hash in target system
- End-to-end: compare source grand totals vs target grand totals using independently computed values
- `ReconciliationManifest`: { run_id, timestamp, source_system, target_system, stages: StageResult[], record_counts: { source, target, matched, unmatched, orphaned }, control_totals: ControlTotal[], verdict: 'PASS' | 'PASS_WITH_EXCEPTIONS' | 'FAIL', breaks: Break[], signature: Ed25519Signature }
- Break resolution: `ReconciliationEngine.resolveBreak(breakId: string, rootCause: string): Promise<void>` — records resolution with lineage trace
- Continuous mode: auto-reconcile on every pipeline run, publish results to audit bus topic `audit:data:reconciliation`
- Report generation: `ReconciliationEngine.generateReport(manifestId: string, format: 'pdf' | 'json'): Promise<Buffer>`
- Point-in-time snapshots: immutable, queryable by date range, minimum 7-year retention for insurance compliance
- Reconciliation dashboard: current status, historical trends, break frequency, MTTR per pipeline
- SLA: reconciliation of 1M-record pipeline completes within 60 seconds
## Database Schema (PostgreSQL)
Create migration files in `migrations/`:
```sql
-- Lineage nodes
CREATE TABLE lineage_nodes (
node_id UUID PRIMARY KEY,
operation VARCHAR(100) NOT NULL,
agent_id VARCHAR(100) NOT NULL,
session_id VARCHAR(100) NOT NULL,
schema_hash VARCHAR(64),
tier VARCHAR(20),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
metadata JSONB
);
CREATE INDEX idx_lineage_nodes_agent ON lineage_nodes(agent_id);
CREATE INDEX idx_lineage_nodes_created ON lineage_nodes(created_at);
-- Lineage edges
CREATE TABLE lineage_edges (
edge_id UUID PRIMARY KEY,
from_node UUID REFERENCES lineage_nodes(node_id),
to_node UUID REFERENCES lineage_nodes(node_id),
field_map JSONB,
transform_applied TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_lineage_edges_from ON lineage_edges(from_node);
CREATE INDEX idx_lineage_edges_to ON lineage_edges(to_node);
-- Quality scores
CREATE TABLE quality_scores (
score_id UUID PRIMARY KEY,
dataset_id VARCHAR(200) NOT NULL,
pipeline_id VARCHAR(200),
total_score SMALLINT NOT NULL,
completeness SMALLINT,
accuracy SMALLINT,
consistency SMALLINT,
timeliness SMALLINT,
failing_checks JSONB,
scored_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_quality_dataset ON quality_scores(dataset_id, scored_at DESC);
-- Classification records
CREATE TABLE classification_records (
record_id UUID PRIMARY KEY,
field_path TEXT NOT NULL,
pipeline_id VARCHAR(200),
detected_tier VARCHAR(20) NOT NULL,
detected_type VARCHAR(100),
confidence NUMERIC(4,3),
status VARCHAR(20) DEFAULT 'AUTO',
override_rationale TEXT,
override_officer VARCHAR(100),
detected_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_classification_pipeline ON classification_records(pipeline_id, detected_at DESC);
```
## Configuration
Use environment variables for all configuration. Provide a `.env.example` with all required variables:
- `DATABASE_URL`, `REDIS_URL`, `AUDIT_BUS_SERVICE_KEY`
- `QUALITY_BLOCKING_THRESHOLD` (default: 700)
- `CLASSIFICATION_MIN_CONFIDENCE` (default: 0.75)
- `LINEAGE_RETENTION_YEARS_STANDARD` (default: 7)
- `LINEAGE_RETENTION_YEARS_RESTRICTED` (default: 10)
- `PIPELINE_ANOMALY_SIGMA_THRESHOLD` (default: 2.0)
## Testing Requirements
- Unit tests for all validators, classifiers, and lineage traversal logic (Jest)
- Integration tests for API endpoints using supertest with a test database
- Performance test: lineage trace on 1M-node DAG must complete in < 500ms
- Classification accuracy test: run built-in test corpus of 500 labeled insurance records; assert > 98.5% detection rate for T3 fields
## Deliverables
1. Complete TypeScript source with no stubs or TODOs
2. Docker Compose file with PostgreSQL, Redis, API service, and dashboard
3. Database migration files
4. API documentation (OpenAPI 3.0 YAML)
5. README with setup instructions, environment variable reference, and regulatory compliance notes
6. Test suite with > 90% coverage on core components
Do not use any hardcoded secrets. All sensitive configuration via environment variables. No arbitrary code execution from user input in the lineage or classification APIs.
Key architectural trade-offs considered during the design of the Agentic Data Plane.
How the Agentic Data Plane connects to the other ten BulletproofSoftware.ai components.
When agents store embeddings or retrieve memories that inform data transformations, those memory operations are recorded as lineage nodes. If an agent uses a retrieved policy template to generate a new claims form, the memory retrieval event becomes a source node in the lineage DAG. This creates a complete provenance chain from institutional knowledge through to generated output — critical for explaining why a system was built a certain way during a regulatory examination.
The data plane is a first-class publisher on the governance framework's central audit bus. All eight lineage event types, quality scores, classification results, and pipeline health events flow into the same audit trail as code governance events. The data classification tiers (T0–T3) defined in the data plane are the authoritative classification system referenced by all other framework components — not a separate data-plane-specific taxonomy.
When the code assurance system reviews agent-generated code, it validates that data access patterns comply with the classification tier of the data being accessed. Code that reads T3 (Restricted) fields without the appropriate access pattern annotations fails the code quality gate. The data plane's classification registry is the reference source for these code-level checks — the two systems share a classification schema rather than maintaining independent taxonomies.
The BulletproofSoftware.ai master dashboard embeds the lineage DAG viewer as a panel, with drill-down links from any system component to its data lineage. A compliance officer viewing the governance dashboard can click on any generated system and navigate directly to its data lineage explorer without switching contexts. The dashboard also surfaces the 30-day quality trend sparklines and classification coverage heatmap as top-level metrics alongside code quality and deployment frequency.
When the quality validation pipeline blocks a dataset from production promotion, it signals the agent orchestrator via a structured event on the audit bus. The orchestrator can respond by pausing dependent agents, triggering a data remediation workflow, or escalating to a human operator. The blocking threshold is configurable per pipeline context — a regulatory filing pipeline may have a stricter threshold than an internal analytics pipeline.
State DOI examinations, HIPAA audits, and NAIC data calls require specific data provenance documentation. The data plane's regulatory report generation endpoint produces structured reports in the format expected by each regulatory body, drawing on the lineage DAG, classification records, and quality history. These reports are generated on-demand and do not require manual data assembly by compliance staff — reducing examination response time from weeks to hours.