Lineage Operations
Lineage captures source-to-target movement plus execution context (pipeline_id, model_id, trace_id, purpose) to make downstream data usage traceable.
What lineage stores
- Job events (
sys.lineage_jobs) - Edge events (
sys.lineage_edges) - Optional projected durable stores (index/table sink configuration)
Enable lineage sink
SET GLOBAL TRANSIENT
lineage.enabled = true,
lineage.sink.mode = 'async',
"lineage.sink.index.enabled" = true,
"lineage.sink.index.jobs_table" = 'doc.lineage_jobs_store_e2e',
"lineage.sink.index.edges_table" = 'doc.lineage_edges_store_e2e',
"lineage.sink.index.shards" = 1,
"lineage.sink.index.replicas" = '0',
"lineage.sink.index.partition_by" = 'day';
Optional projected tables
CREATE TABLE IF NOT EXISTS doc.lineage_jobs_store_e2e (
event_id STRING,
job_id STRING,
user_id STRING,
timestamp LONG,
query_hash STRING,
status STRING,
pipeline_id STRING,
model_id STRING,
dataset_version STRING,
purpose STRING,
trace_id STRING,
event_bucket STRING,
PRIMARY KEY (event_id, event_bucket)
)
PARTITIONED BY (event_bucket)
CLUSTERED INTO 1 SHARDS
WITH (number_of_replicas = 0);
CREATE TABLE IF NOT EXISTS doc.lineage_edges_store_e2e (
event_id STRING,
job_id STRING,
source_table STRING,
source_columns ARRAY(STRING),
target_table STRING,
target_columns ARRAY(STRING),
transform_metadata OBJECT(DYNAMIC),
timestamp LONG,
pipeline_id STRING,
model_id STRING,
dataset_version STRING,
purpose STRING,
trace_id STRING,
event_bucket STRING,
PRIMARY KEY (event_id, event_bucket)
)
PARTITIONED BY (event_bucket)
CLUSTERED INTO 1 SHARDS
WITH (number_of_replicas = 0);
Generate lineage
As testuser:
SET SESSION lineage.enabled = true;
SET SESSION pipeline_id = 'pipeline-ctx';
SET SESSION model_id = 'model-ctx';
SET SESSION dataset_version = 'v-ctx';
SET SESSION purpose = 'fraud_scoring';
SET SESSION trace_id = 'trace-ctx';
INSERT INTO doc.l_src (id) VALUES (1), (2);
INSERT INTO doc.l_dst (id) SELECT id FROM doc.l_src;
Inspect lineage
SELECT source_table, target_table, transform_metadata['operation']
FROM sys.lineage_edges
WHERE source_table = 'doc.l_src' AND target_table = 'doc.l_dst'
ORDER BY timestamp DESC
LIMIT 5;
SELECT job_id, pipeline_id, model_id, dataset_version, purpose, trace_id
FROM sys.lineage_jobs
ORDER BY timestamp DESC
LIMIT 5;
Projected table checks:
REFRESH TABLE doc.lineage_edges_store_e2e;
REFRESH TABLE doc.lineage_jobs_store_e2e;
SELECT job_id, source_table, target_table, transform_metadata['operation']
FROM doc.lineage_edges_store_e2e
ORDER BY timestamp DESC
LIMIT 10;
SELECT job_id, pipeline_id, model_id, dataset_version, purpose, trace_id
FROM doc.lineage_jobs_store_e2e
ORDER BY timestamp DESC
LIMIT 10;
Sink health checks
SELECT mode,
queue_depth,
queue_size,
sink_lag_ms,
retry_count,
dropped_events,
spool_replay_count
FROM sys.lineage_sink_metrics
LIMIT 1;
Troubleshooting
- No lineage rows:
- confirm
lineage.enabledat session/cluster scopes. - confirm statements perform actual source-target data movement.
- High sink lag:
- tune async queue/batch settings and reduce burst pressure.
- Missing context fields:
- verify
pipeline_id,model_id,dataset_version,purpose,trace_idare set before execution.