Dela via


Övervaka DLT-pipelines

Den här artikeln beskriver hur du använder inbyggda funktioner för övervakning och observerbarhet för DLT-pipelines. Dessa funktioner stöder uppgifter som:

Information om hur du inspekterar och diagnostiserar frågeprestanda finns i Access-frågehistorik för DLT-pipelines. Den här funktionen finns i offentlig förhandsversion.

Lägga till e-postaviseringar för pipelinehändelser

Du kan konfigurera en eller flera e-postadresser för att ta emot meddelanden när följande inträffar:

  • Pipelineuppdateringen har slutförts framgångsrikt.
  • En pipelineuppdatering misslyckas, antingen med ett återförsöksbart fel eller ett fel som inte kan åtgärdas med ett nytt försök. Välj det här alternativet om du vill få ett meddelande om alla pipelinefel.
  • En pipelineuppdatering misslyckas med ett fel som inte går att försöka igen (allvarligt). Välj det här alternativet om du bara vill få ett meddelande när ett fel som inte kan försökas igen inträffar.
  • Ett enskilt dataflöde misslyckas.

Så här konfigurerar du e-postaviseringar när du skapar eller redigerar en pipeline:

  1. Klicka på Lägg till meddelande.
  2. Ange en eller flera e-postadresser för att ta emot meddelanden.
  3. Klicka på kryssrutan för varje meddelandetyp som ska skickas till de konfigurerade e-postadresserna.
  4. Klicka på Lägg till meddelande.

Vilken pipelineinformation är tillgänglig i användargränssnittet?

Pipelinediagrammet visas så snart en uppdatering av en pipeline har startat framgångsrikt. Pilar representerar beroenden mellan datauppsättningar i pipelinen. Som standardinställning visar detaljsidan för pipeline den senaste uppdateringen för tabellen, men du kan välja äldre uppdateringar från en nedrullningsbar meny.

Information omfattar pipeline-ID, källkod, beräkningskostnad, produktutgåva och kanalen som konfigurerats för pipelinen.

Om du vill se en tabellvy över datauppsättningar klickar du på fliken Lista. Med List-vyn kan du se alla datauppsättningar i pipelinen som visas som en rad i en tabell och är användbart när din pipeline DAG är för stor för att visualisera i Graph-vyn. Du kan styra de datauppsättningar som visas i tabellen med hjälp av flera filter, till exempel datauppsättningens namn, typ och status. Om du vill växla tillbaka till DAG-visualiseringen klickar du på Graph.

Kör som-användare är pipelineägaren och pipelineuppdateringar körs med den här användarens behörigheter. För att ändra informationen för run as användaren klickar du på Behörigheter och byter ut pipelineägaren.

Hur kan du visa datamängdsinformation?

Om du klickar på en datauppsättning i pipelinediagrammet eller datamängdslistan visas information om datamängden. Information omfattar datamängdsschemat, datakvalitetsmått och en länk till källkoden som definierar datamängden.

Visa uppdateringshistorik

Om du vill visa historik och status för pipelineuppdateringar klickar du på den nedrullningsbara menyn uppdateringshistorik i det övre fältet.

Välj uppdateringen i den nedrullningsbara menyn för att visa grafen, informationen och händelserna för en uppdatering. Om du vill återgå till den senaste uppdateringen klickar du på Visa den senaste uppdateringen.

Visa strömningsmått

Viktig

Observabilitet för strömning av DLT är i offentlig förhandsversion .

Du kan visa strömmande mått från de datakällor som stöds av Spark Structured Streaming, som Apache Kafka, Amazon Kinesis, Auto Loader och Delta-tabeller, för varje strömningsflöde i din DLT-pipeline. Mått visas som diagram i DLT-användargränssnittets högra fönster och inkluderar eftersläpning i sekunder, eftersläpning i byte, eftersläpning i poster och eftersläpningsfiler. Diagrammen visar de maximala värdena aggregerade per minut och ett verktygstips visar maximala värden när du för muspekaren över diagrammet. Data är begränsade till de senaste 48 timmarna från den aktuella tiden.

Tabeller i din pipeline med tillgängliga strömningsmått visar ikonen DLT-diagramikonen när du visar pipeline-DAG:en i användargränssnittet i Graph-vyn. Om du vill visa strömningsmåtten klickar du på ikonen DLT-diagram för att visa det strömmande måttdiagrammet på fliken Flöden i den högra rutan. Du kan också använda ett filter för att endast visa tabeller med strömmande mått genom att klicka på List och sedan klicka på Har strömningsmått.

Varje strömmande källa stöder endast specifika mått. Mått som inte stöds av en strömmande källa är inte tillgängliga för visning i användargränssnittet. I följande tabell visas de mått som är tillgängliga för strömmande källor som stöds:

källa kvarvarande byte poster i eftersläpning kösekunder kvarvarande filer
Kafka
Kinesis
Delta
Automatisk inläsning
Google Pub/Sub

Vad är DLT-händelseloggen?

DLT-händelseloggen innehåller all information som rör en pipeline, inklusive granskningsloggar, datakvalitetskontroller, pipelineförlopp och data härkomst. Du kan använda händelseloggen för att spåra, förstå och övervaka tillståndet för dina datapipelines.

Du kan visa händelseloggposter i DLT-användargränssnittet, DLT APIeller genom att fråga händelseloggen direkt. Det här avsnittet fokuserar på att fråga händelseloggen direkt.

Du kan också definiera anpassade åtgärder som ska köras när händelser loggas, till exempel att skicka aviseringar, med händelsekrokar.

Viktig

Ta inte bort händelseloggen eller den överordnade katalogen eller schemat där händelseloggen publiceras. Om du tar bort händelseloggen kan det leda till att pipelinen inte uppdateras under framtida körningar.

Händelseloggschema

I följande tabell beskrivs schemat för händelseloggen. Vissa av dessa fält innehåller JSON-data som kräver parsning för att utföra vissa frågor, till exempel fältet details. Azure Databricks stöder :-operatorn för att parsa JSON-fält. Se : (kolon) operatör.

Fält Beskrivning
id En unik identifierare för händelseloggposten.
sequence Ett JSON-dokument som innehåller metadata för att identifiera och beställa händelser.
origin Ett JSON-dokument som innehåller metadata för händelsens ursprung, till exempel molnleverantören, molnproviderregionen, user_id, pipeline_ideller pipeline_type för att visa var pipelinen skapades, antingen DBSQL eller WORKSPACE.
timestamp Den tid då händelsen spelades in.
message Ett läsbart meddelande som beskriver händelsen.
level Händelsetypen, till exempel INFO, WARN, ERROReller METRICS.
maturity_level Händelseschemats stabilitet. Möjliga värden är:
  • STABLE: Schemat är stabilt och ändras inte.
  • NULL: Schemat är stabilt och ändras inte. Värdet kan vara NULL om posten skapades innan fältet maturity_level lades till (version 2022.37).
  • EVOLVING: Schemat är inte stabilt och kan ändras.
  • DEPRECATED: Schemat är föråldrat och DLT-körtiden kan sluta producera den här händelsen när som helst.
error Om ett fel har uppstått finns information som beskriver felet.
details Ett JSON-dokument som innehåller strukturerad information om händelsen. Det här är det primära fältet som används för att analysera händelser.
event_type Händelsetyp

Sök i händelseloggen

Obs

I det här avsnittet beskrivs standardbeteendet och syntaxen för att arbeta med händelseloggar för pipelines som konfigurerats med Unity Catalog och standardpubliceringsläget.

Som standard skriver DLT händelseloggen till en dold Delta-tabell i standardkatalogen och schemat som konfigurerats för pipelinen. Även om tabellen är dold kan den fortfarande efterfrågas av alla tillräckligt privilegierade användare. Som standard kan endast ägaren av pipelinen köra frågor mot händelseloggtabellen.

Som standard formateras namnet på den dolda händelseloggen som event_log_{pipeline_id}, där pipeline-ID:t är det systemtilldelade UUID:t med streckat ersatt av understreck.

Du kan interagera med JSON-konfigurationen för att publicera händelseloggen. När du publicerar en händelselogg anger du namnet på händelseloggen och kan också ange en katalog och ett schema, som i följande exempel:

{
  "id": "ec2a0ff4-d2a5-4c8c-bf1d-d9f12f10e749",
  "name": "billing_pipeline",
  "event_log": {
    "catalog": "catalog_name",
    "schema": "schema_name",
    "name": "event_log_table_name"
  }
}

Platsen för händelseloggen fungerar också som schemaplats för alla frågor för automatisk inläsning i pipelinen. Databricks rekommenderar att du skapar en vy över händelseloggtabellen innan du ändrar behörigheterna, eftersom vissa beräkningsinställningar kan ge användarna åtkomst till schemametadata om händelseloggtabellen delas direkt. Följande exempelsyntax skapar en vy i en händelseloggtabell och används i exempelhändelseloggfrågorna som ingår i den här artikeln.

CREATE VIEW event_log_raw
AS SELECT * FROM catalog_name.schema_name.event_log_table_name;

Varje instans av en pipelinekörning kallas för en uppdatering. Du vill ofta extrahera information för den senaste uppdateringen. Kör följande fråga för att hitta identifieraren för den senaste uppdateringen och spara den i den latest_update tillfälliga vyn. Den här vyn används i exempelfrågorna i händelseloggen som ingår i den här artikeln:

CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;

Inom Unity Catalog stöder vyer strömmande frågor. I följande exempel används Structured Streaming för att fråga en vy som definierats ovanpå en händelseloggtabell:

df = spark.readStream.table("event_log_raw")

Ägaren av pipelinen kan publicera händelseloggen som en offentlig Delta-tabell genom att växla alternativet Publish event log to metastore i avsnittet Avancerat i pipelinekonfigurationen. Du kan också ange ett nytt tabellnamn, en katalog och ett schema för händelseloggen.

Hämta härkomstinformation från händelseloggen

Händelser som innehåller information om ursprung har händelsetypen flow_definition. Objektet details:flow_definition innehåller output_dataset och input_datasets som definierar varje relation i diagrammet.

Du kan använda följande fråga för att extrahera indata- och utdatauppsättningarna för att se ursprungsinformation:

SELECT
  details:flow_definition.output_dataset as output_dataset,
  details:flow_definition.input_datasets as input_dataset
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'flow_definition'
  AND
  origin.update_id = latest_update.id
output_dataset input_datasets
customers null
sales_orders_raw null
sales_orders_cleaned ["customers", "sales_orders_raw"]
sales_order_in_la ["sales_orders_cleaned"]

Frågedatakvalitet från händelseloggen

Om du definierar förväntningar på datauppsättningar i pipelinen lagras datakvalitetsmåtten i details:flow_progress.data_quality.expectations-objektet. Händelser som innehåller information om datakvalitet har händelsetypen flow_progress. I följande exempel frågar vi efter datakvalitetsmetrik för den senaste pipelineuppdateringen.

SELECT
  row_expectations.dataset as dataset,
  row_expectations.name as expectation,
  SUM(row_expectations.passed_records) as passing_records,
  SUM(row_expectations.failed_records) as failing_records
FROM
  (
    SELECT
      explode(
        from_json(
          details :flow_progress :data_quality :expectations,
          "array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
        )
      ) row_expectations
    FROM
      event_log_raw,
      latest_update
    WHERE
      event_type = 'flow_progress'
      AND origin.update_id = latest_update.id
  )
GROUP BY
  row_expectations.dataset,
  row_expectations.name
dataset expectation passing_records failing_records
sales_orders_cleaned valid_order_number 4083 0

Hämta Auto Loaders händelser från händelseloggen

DLT genererar händelser när autoinläsaren bearbetar filer. För händelser med automatisk inläsning är event_typeoperation_progress och details:operation_progress:type är antingen AUTO_LOADER_LISTING eller AUTO_LOADER_BACKFILL. details:operation_progress-objektet innehåller även fälten status, duration_ms, auto_loader_details:source_pathoch auto_loader_details:num_files_listed.

I följande exempel begärs Auto Loader-händelser för den senaste uppdateringen.

SELECT
  timestamp,
  details:operation_progress.status,
  details:operation_progress.type,
  details:operation_progress:auto_loader_details
FROM
  event_log_raw,
  latest_update
WHERE
  event_type like 'operation_progress'
  AND
  origin.update_id = latest.update_id
  AND
  details:operation_progress.type in ('AUTO_LOADER_LISTING', 'AUTO_LOADER_BACKFILL')

Övervaka kvarvarande data genom att fråga händelseloggen

DLT spårar hur mycket data som finns i kvarvarande uppgifter i details:flow_progress.metrics.backlog_bytes-objektet. Händelser som innehåller mått för kvarvarande uppgifter har händelsetypen flow_progress. Följande exempel hämtar backlog-mått för den senaste pipelineuppdateringen.

SELECT
  timestamp,
  Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
  event_log_raw,
  latest_update
WHERE
  event_type ='flow_progress'
  AND
  origin.update_id = latest_update.id

Obs

Måtten för kvarvarande uppgifter kanske inte är tillgängliga beroende på pipelinens datakällatyp och Databricks Runtime-version.

Övervaka förbättrade händelser för autoskalning från händelseloggen för pipelines där serverlös funktion inte är aktiverad

För DLT-pipelines som inte använder serverlös beräkning, registrerar händelseloggen klusterförändringar när avancerad autoskalning är aktiverad i dina pipelines. Händelser som innehåller information om förbättrad autoskalning har händelsetypen autoscale. Informationen om en klusterstorleksändringsbegäran lagras i objektet details:autoscale. I följande exempel frågar vi om förfrågningar gällande storleksändring för det förbättrade autoskalningsklustret sedan den senaste uppdateringen av pipelinen.

SELECT
  timestamp,
  Double(
    case
      when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
      else null
    end
  ) as starting_num_executors,
  Double(
    case
      when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as partially_succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
      else null
    end
  ) as failed_num_executors
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'autoscale'
  AND
  origin.update_id = latest_update.id

Övervaka beräkningsresursanvändning

cluster_resources händelser ger mått på antalet aktivitetsfack i klustret, hur mycket dessa aktivitetsfack används och hur många aktiviteter som väntar på att schemaläggas.

När förbättrad autoskalning är aktiverad innehåller cluster_resources händelser även mått för algoritmen för automatisk skalning, inklusive latest_requested_num_executorsoch optimal_num_executors. Händelserna visar också status för algoritmen som olika tillstånd, till exempel CLUSTER_AT_DESIRED_SIZE, SCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORSoch BLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION. Dessa uppgifter kan visas tillsammans med autoskalningshändelserna för att ge en helhetsbild av förbättrad autoskalning.

I följande exempel frågas efter historiken för aktivitetsköstorleken vid den senaste pipelineuppdateringen.

SELECT
  timestamp,
  Double(details :cluster_resources.avg_num_queued_tasks) as queue_size
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

I följande exempel efterfrågas användningshistoriken för den senaste pipelineuppdateringen:

SELECT
  timestamp,
  Double(details :cluster_resources.avg_task_slot_utilization) as utilization
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

I följande exempel efterfrågas historiken över antalet exekutorer, tillsammans med mått som endast är tillgängliga för förbättrade autoscalingspipelines, inklusive antalet exekutorer som begärdes av algoritmen i den senaste begäran, det optimala antalet exekutorer som rekommenderas av algoritmen baserat på de senaste måtten och autoskalningsalgoritmens tillstånd.

SELECT
  timestamp,
  Double(details :cluster_resources.num_executors) as current_executors,
  Double(details :cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
  Double(details :cluster_resources.optimal_num_executors) as optimal_num_executors,
  details :cluster_resources.state as autoscaling_state
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

Granska DLT-pipelines

Du kan använda DLT-händelseloggposter och andra Azure Databricks-granskningsloggar för att få en fullständig bild av hur data uppdateras i DLT.

DLT använder autentiseringsuppgifterna för pipelineägaren för att köra uppdateringar. Du kan ändra de autentiseringsuppgifter som används genom att uppdatera pipelineägaren. DLT registrerar användaren för åtgärder på pipelinen, inklusive skapande av pipeline, ändringar i konfigurationen och utlösande uppdateringar.

Mer information om granskningshändelser i Unity Catalog finns i Unity Catalog-händelser.

Fråga efter användaråtgärder i händelseloggen

Du kan använda händelseloggen för att granska händelser, till exempel användaråtgärder. Händelser som innehåller information om användaråtgärder har händelsetypen user_action.

Information om åtgärden lagras i user_action-objektet i fältet details. Använd följande fråga för att skapa en granskningslogg med användarhändelser. För att skapa den event_log_raw-vy som används i den här frågan, se Fråga händelseloggen.

SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
timestamp action user_name
2021-05-20T19:36:03.517+0000 START user@company.com
2021-05-20T19:35:59.913+0000 CREATE user@company.com
2021-05-27T00:35:51.971+0000 START user@company.com

Körinformation

Du kan visa driftinformation för en pipelineuppdatering, till exempel Databricks Runtime-versionen för uppdateringen.

SELECT details:create_update:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'create_update'
dbr_version
11.0