Övervaka DLT-pipelines
Den här artikeln beskriver hur du använder inbyggda funktioner för övervakning och observerbarhet för DLT-pipelines. Dessa funktioner stöder uppgifter som:
- Övervaka förloppet och statusen för pipelineuppdateringar. Se Vilken pipeline-information är tillgänglig i det användargränssnittet?.
- Aviseringar om pipelinehändelser, till exempel lyckade eller misslyckade pipelineuppdateringar. Se Lägg till e-postaviseringar för pipelinehändelser.
- Visa mått för strömmande källor som Apache Kafka och Auto Loader (offentlig förhandsversion). Se Visa strömningsmått.
- Extrahera detaljerad information om pipelineuppdateringar som dataursprung, datakvalitetsmätvärden och resursanvändning. Se Vad är DLT-händelseloggen?.
- Definiera anpassade åtgärder som ska utföras när specifika händelser inträffar. Se Definiera anpassad övervakning av DLT-pipelines med händelseutlösare.
Information om hur du inspekterar och diagnostiserar frågeprestanda finns i Access-frågehistorik för DLT-pipelines. Den här funktionen finns i offentlig förhandsversion.
Lägga till e-postaviseringar för pipelinehändelser
Du kan konfigurera en eller flera e-postadresser för att ta emot meddelanden när följande inträffar:
- Pipelineuppdateringen har slutförts framgångsrikt.
- En pipelineuppdatering misslyckas, antingen med ett återförsöksbart fel eller ett fel som inte kan åtgärdas med ett nytt försök. Välj det här alternativet om du vill få ett meddelande om alla pipelinefel.
- En pipelineuppdatering misslyckas med ett fel som inte går att försöka igen (allvarligt). Välj det här alternativet om du bara vill få ett meddelande när ett fel som inte kan försökas igen inträffar.
- Ett enskilt dataflöde misslyckas.
Så här konfigurerar du e-postaviseringar när du skapar eller redigerar en pipeline:
- Klicka på Lägg till meddelande.
- Ange en eller flera e-postadresser för att ta emot meddelanden.
- Klicka på kryssrutan för varje meddelandetyp som ska skickas till de konfigurerade e-postadresserna.
- Klicka på Lägg till meddelande.
Vilken pipelineinformation är tillgänglig i användargränssnittet?
Pipelinediagrammet visas så snart en uppdatering av en pipeline har startat framgångsrikt. Pilar representerar beroenden mellan datauppsättningar i pipelinen. Som standardinställning visar detaljsidan för pipeline den senaste uppdateringen för tabellen, men du kan välja äldre uppdateringar från en nedrullningsbar meny.
Information omfattar pipeline-ID, källkod, beräkningskostnad, produktutgåva och kanalen som konfigurerats för pipelinen.
Om du vill se en tabellvy över datauppsättningar klickar du på fliken Lista. Med List-vyn kan du se alla datauppsättningar i pipelinen som visas som en rad i en tabell och är användbart när din pipeline DAG är för stor för att visualisera i Graph-vyn. Du kan styra de datauppsättningar som visas i tabellen med hjälp av flera filter, till exempel datauppsättningens namn, typ och status. Om du vill växla tillbaka till DAG-visualiseringen klickar du på Graph.
Kör som-användare är pipelineägaren och pipelineuppdateringar körs med den här användarens behörigheter. För att ändra informationen för run as
användaren klickar du på Behörigheter och byter ut pipelineägaren.
Hur kan du visa datamängdsinformation?
Om du klickar på en datauppsättning i pipelinediagrammet eller datamängdslistan visas information om datamängden. Information omfattar datamängdsschemat, datakvalitetsmått och en länk till källkoden som definierar datamängden.
Visa uppdateringshistorik
Om du vill visa historik och status för pipelineuppdateringar klickar du på den nedrullningsbara menyn uppdateringshistorik i det övre fältet.
Välj uppdateringen i den nedrullningsbara menyn för att visa grafen, informationen och händelserna för en uppdatering. Om du vill återgå till den senaste uppdateringen klickar du på Visa den senaste uppdateringen.
Visa strömningsmått
Du kan visa strömmande mått från de datakällor som stöds av Spark Structured Streaming, som Apache Kafka, Amazon Kinesis, Auto Loader och Delta-tabeller, för varje strömningsflöde i din DLT-pipeline. Mått visas som diagram i DLT-användargränssnittets högra fönster och inkluderar eftersläpning i sekunder, eftersläpning i byte, eftersläpning i poster och eftersläpningsfiler. Diagrammen visar de maximala värdena aggregerade per minut och ett verktygstips visar maximala värden när du för muspekaren över diagrammet. Data är begränsade till de senaste 48 timmarna från den aktuella tiden.
Tabeller i din pipeline med tillgängliga strömningsmått visar ikonen när du visar pipeline-DAG:en i användargränssnittet i Graph-vyn. Om du vill visa strömningsmåtten klickar du på ikonen
för att visa det strömmande måttdiagrammet på fliken Flöden i den högra rutan. Du kan också använda ett filter för att endast visa tabeller med strömmande mått genom att klicka på List och sedan klicka på Har strömningsmått.
Varje strömmande källa stöder endast specifika mått. Mått som inte stöds av en strömmande källa är inte tillgängliga för visning i användargränssnittet. I följande tabell visas de mått som är tillgängliga för strömmande källor som stöds:
källa | kvarvarande byte | poster i eftersläpning | kösekunder | kvarvarande filer |
---|---|---|---|---|
Kafka | ✓ | ✓ | ||
Kinesis | ✓ | ✓ | ||
Delta | ✓ | ✓ | ||
Automatisk inläsning | ✓ | ✓ | ||
Google Pub/Sub | ✓ | ✓ |
Vad är DLT-händelseloggen?
DLT-händelseloggen innehåller all information som rör en pipeline, inklusive granskningsloggar, datakvalitetskontroller, pipelineförlopp och data härkomst. Du kan använda händelseloggen för att spåra, förstå och övervaka tillståndet för dina datapipelines.
Du kan visa händelseloggposter i DLT-användargränssnittet, DLT APIeller genom att fråga händelseloggen direkt. Det här avsnittet fokuserar på att fråga händelseloggen direkt.
Du kan också definiera anpassade åtgärder som ska köras när händelser loggas, till exempel att skicka aviseringar, med händelsekrokar.
Viktig
Ta inte bort händelseloggen eller den överordnade katalogen eller schemat där händelseloggen publiceras. Om du tar bort händelseloggen kan det leda till att pipelinen inte uppdateras under framtida körningar.
Händelseloggschema
I följande tabell beskrivs schemat för händelseloggen. Vissa av dessa fält innehåller JSON-data som kräver parsning för att utföra vissa frågor, till exempel fältet details
. Azure Databricks stöder :
-operatorn för att parsa JSON-fält. Se :
(kolon) operatör.
Fält | Beskrivning |
---|---|
id |
En unik identifierare för händelseloggposten. |
sequence |
Ett JSON-dokument som innehåller metadata för att identifiera och beställa händelser. |
origin |
Ett JSON-dokument som innehåller metadata för händelsens ursprung, till exempel molnleverantören, molnproviderregionen, user_id , pipeline_id eller pipeline_type för att visa var pipelinen skapades, antingen DBSQL eller WORKSPACE . |
timestamp |
Den tid då händelsen spelades in. |
message |
Ett läsbart meddelande som beskriver händelsen. |
level |
Händelsetypen, till exempel INFO , WARN , ERROR eller METRICS . |
maturity_level |
Händelseschemats stabilitet. Möjliga värden är:
|
error |
Om ett fel har uppstått finns information som beskriver felet. |
details |
Ett JSON-dokument som innehåller strukturerad information om händelsen. Det här är det primära fältet som används för att analysera händelser. |
event_type |
Händelsetyp |
Sök i händelseloggen
Obs
I det här avsnittet beskrivs standardbeteendet och syntaxen för att arbeta med händelseloggar för pipelines som konfigurerats med Unity Catalog och standardpubliceringsläget.
- Information om Unity Catalog-pipelines som använder äldre publiceringsläge finns i Så här arbetar du med händelselogg för Unity Catalog-pipelines i äldre publiceringsläge.
- Beteende och syntax för Hive-metaarkivpipelines finns i Arbeta med händelseloggen för Hive-metaarkivpipelines.
Som standard skriver DLT händelseloggen till en dold Delta-tabell i standardkatalogen och schemat som konfigurerats för pipelinen. Även om tabellen är dold kan den fortfarande efterfrågas av alla tillräckligt privilegierade användare. Som standard kan endast ägaren av pipelinen köra frågor mot händelseloggtabellen.
Som standard formateras namnet på den dolda händelseloggen som event_log_{pipeline_id}
, där pipeline-ID:t är det systemtilldelade UUID:t med streckat ersatt av understreck.
Du kan interagera med JSON-konfigurationen för att publicera händelseloggen. När du publicerar en händelselogg anger du namnet på händelseloggen och kan också ange en katalog och ett schema, som i följande exempel:
{
"id": "ec2a0ff4-d2a5-4c8c-bf1d-d9f12f10e749",
"name": "billing_pipeline",
"event_log": {
"catalog": "catalog_name",
"schema": "schema_name",
"name": "event_log_table_name"
}
}
Platsen för händelseloggen fungerar också som schemaplats för alla frågor för automatisk inläsning i pipelinen. Databricks rekommenderar att du skapar en vy över händelseloggtabellen innan du ändrar behörigheterna, eftersom vissa beräkningsinställningar kan ge användarna åtkomst till schemametadata om händelseloggtabellen delas direkt. Följande exempelsyntax skapar en vy i en händelseloggtabell och används i exempelhändelseloggfrågorna som ingår i den här artikeln.
CREATE VIEW event_log_raw
AS SELECT * FROM catalog_name.schema_name.event_log_table_name;
Varje instans av en pipelinekörning kallas för en uppdatering. Du vill ofta extrahera information för den senaste uppdateringen. Kör följande fråga för att hitta identifieraren för den senaste uppdateringen och spara den i den latest_update
tillfälliga vyn. Den här vyn används i exempelfrågorna i händelseloggen som ingår i den här artikeln:
CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;
Inom Unity Catalog stöder vyer strömmande frågor. I följande exempel används Structured Streaming för att fråga en vy som definierats ovanpå en händelseloggtabell:
df = spark.readStream.table("event_log_raw")
Ägaren av pipelinen kan publicera händelseloggen som en offentlig Delta-tabell genom att växla alternativet Publish event log to metastore
i avsnittet Avancerat i pipelinekonfigurationen. Du kan också ange ett nytt tabellnamn, en katalog och ett schema för händelseloggen.
Hämta härkomstinformation från händelseloggen
Händelser som innehåller information om ursprung har händelsetypen flow_definition
. Objektet details:flow_definition
innehåller output_dataset
och input_datasets
som definierar varje relation i diagrammet.
Du kan använda följande fråga för att extrahera indata- och utdatauppsättningarna för att se ursprungsinformation:
SELECT
details:flow_definition.output_dataset as output_dataset,
details:flow_definition.input_datasets as input_dataset
FROM
event_log_raw,
latest_update
WHERE
event_type = 'flow_definition'
AND
origin.update_id = latest_update.id
output_dataset |
input_datasets |
---|---|
customers |
null |
sales_orders_raw |
null |
sales_orders_cleaned |
["customers", "sales_orders_raw"] |
sales_order_in_la |
["sales_orders_cleaned"] |
Frågedatakvalitet från händelseloggen
Om du definierar förväntningar på datauppsättningar i pipelinen lagras datakvalitetsmåtten i details:flow_progress.data_quality.expectations
-objektet. Händelser som innehåller information om datakvalitet har händelsetypen flow_progress
. I följande exempel frågar vi efter datakvalitetsmetrik för den senaste pipelineuppdateringen.
SELECT
row_expectations.dataset as dataset,
row_expectations.name as expectation,
SUM(row_expectations.passed_records) as passing_records,
SUM(row_expectations.failed_records) as failing_records
FROM
(
SELECT
explode(
from_json(
details :flow_progress :data_quality :expectations,
"array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
)
) row_expectations
FROM
event_log_raw,
latest_update
WHERE
event_type = 'flow_progress'
AND origin.update_id = latest_update.id
)
GROUP BY
row_expectations.dataset,
row_expectations.name
dataset |
expectation |
passing_records |
failing_records |
---|---|---|---|
sales_orders_cleaned |
valid_order_number |
4083 | 0 |
Hämta Auto Loaders händelser från händelseloggen
DLT genererar händelser när autoinläsaren bearbetar filer. För händelser med automatisk inläsning är event_type
operation_progress
och details:operation_progress:type
är antingen AUTO_LOADER_LISTING
eller AUTO_LOADER_BACKFILL
.
details:operation_progress
-objektet innehåller även fälten status
, duration_ms
, auto_loader_details:source_path
och auto_loader_details:num_files_listed
.
I följande exempel begärs Auto Loader-händelser för den senaste uppdateringen.
SELECT
timestamp,
details:operation_progress.status,
details:operation_progress.type,
details:operation_progress:auto_loader_details
FROM
event_log_raw,
latest_update
WHERE
event_type like 'operation_progress'
AND
origin.update_id = latest.update_id
AND
details:operation_progress.type in ('AUTO_LOADER_LISTING', 'AUTO_LOADER_BACKFILL')
Övervaka kvarvarande data genom att fråga händelseloggen
DLT spårar hur mycket data som finns i kvarvarande uppgifter i details:flow_progress.metrics.backlog_bytes
-objektet. Händelser som innehåller mått för kvarvarande uppgifter har händelsetypen flow_progress
. Följande exempel hämtar backlog-mått för den senaste pipelineuppdateringen.
SELECT
timestamp,
Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
event_log_raw,
latest_update
WHERE
event_type ='flow_progress'
AND
origin.update_id = latest_update.id
Obs
Måtten för kvarvarande uppgifter kanske inte är tillgängliga beroende på pipelinens datakällatyp och Databricks Runtime-version.
Övervaka förbättrade händelser för autoskalning från händelseloggen för pipelines där serverlös funktion inte är aktiverad
För DLT-pipelines som inte använder serverlös beräkning, registrerar händelseloggen klusterförändringar när avancerad autoskalning är aktiverad i dina pipelines. Händelser som innehåller information om förbättrad autoskalning har händelsetypen autoscale
. Informationen om en klusterstorleksändringsbegäran lagras i objektet details:autoscale
. I följande exempel frågar vi om förfrågningar gällande storleksändring för det förbättrade autoskalningsklustret sedan den senaste uppdateringen av pipelinen.
SELECT
timestamp,
Double(
case
when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
else null
end
) as starting_num_executors,
Double(
case
when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as succeeded_num_executors,
Double(
case
when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as partially_succeeded_num_executors,
Double(
case
when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
else null
end
) as failed_num_executors
FROM
event_log_raw,
latest_update
WHERE
event_type = 'autoscale'
AND
origin.update_id = latest_update.id
Övervaka beräkningsresursanvändning
cluster_resources
händelser ger mått på antalet aktivitetsfack i klustret, hur mycket dessa aktivitetsfack används och hur många aktiviteter som väntar på att schemaläggas.
När förbättrad autoskalning är aktiverad innehåller cluster_resources
händelser även mått för algoritmen för automatisk skalning, inklusive latest_requested_num_executors
och optimal_num_executors
. Händelserna visar också status för algoritmen som olika tillstånd, till exempel CLUSTER_AT_DESIRED_SIZE
, SCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORS
och BLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION
.
Dessa uppgifter kan visas tillsammans med autoskalningshändelserna för att ge en helhetsbild av förbättrad autoskalning.
I följande exempel frågas efter historiken för aktivitetsköstorleken vid den senaste pipelineuppdateringen.
SELECT
timestamp,
Double(details :cluster_resources.avg_num_queued_tasks) as queue_size
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id
I följande exempel efterfrågas användningshistoriken för den senaste pipelineuppdateringen:
SELECT
timestamp,
Double(details :cluster_resources.avg_task_slot_utilization) as utilization
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id
I följande exempel efterfrågas historiken över antalet exekutorer, tillsammans med mått som endast är tillgängliga för förbättrade autoscalingspipelines, inklusive antalet exekutorer som begärdes av algoritmen i den senaste begäran, det optimala antalet exekutorer som rekommenderas av algoritmen baserat på de senaste måtten och autoskalningsalgoritmens tillstånd.
SELECT
timestamp,
Double(details :cluster_resources.num_executors) as current_executors,
Double(details :cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
Double(details :cluster_resources.optimal_num_executors) as optimal_num_executors,
details :cluster_resources.state as autoscaling_state
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id
Granska DLT-pipelines
Du kan använda DLT-händelseloggposter och andra Azure Databricks-granskningsloggar för att få en fullständig bild av hur data uppdateras i DLT.
DLT använder autentiseringsuppgifterna för pipelineägaren för att köra uppdateringar. Du kan ändra de autentiseringsuppgifter som används genom att uppdatera pipelineägaren. DLT registrerar användaren för åtgärder på pipelinen, inklusive skapande av pipeline, ändringar i konfigurationen och utlösande uppdateringar.
Mer information om granskningshändelser i Unity Catalog finns i Unity Catalog-händelser.
Fråga efter användaråtgärder i händelseloggen
Du kan använda händelseloggen för att granska händelser, till exempel användaråtgärder. Händelser som innehåller information om användaråtgärder har händelsetypen user_action
.
Information om åtgärden lagras i user_action
-objektet i fältet details
. Använd följande fråga för att skapa en granskningslogg med användarhändelser. För att skapa den event_log_raw
-vy som används i den här frågan, se Fråga händelseloggen.
SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
timestamp |
action |
user_name |
---|---|---|
2021-05-20T19:36:03.517+0000 | START |
user@company.com |
2021-05-20T19:35:59.913+0000 | CREATE |
user@company.com |
2021-05-27T00:35:51.971+0000 | START |
user@company.com |
Körinformation
Du kan visa driftinformation för en pipelineuppdatering, till exempel Databricks Runtime-versionen för uppdateringen.
SELECT details:create_update:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'create_update'
dbr_version |
---|
11.0 |