Skapa en datapipeline från slutpunkt till slutpunkt i Databricks
Den här artikeln visar hur du skapar och distribuerar en pipeline för databearbetning från slutpunkt till slutpunkt, inklusive hur du matar in rådata, transformerar data och kör analyser på bearbetade data.
Anteckning
Även om den här artikeln visar hur du skapar en fullständig datapipeline med databricks notebook-filer och ett Azure Databricks-jobb för att samordna ett arbetsflöde, rekommenderar Databricks att du använder DLT-, ett deklarativt gränssnitt för att skapa tillförlitliga, underhållsbara och testbara databearbetningspipelines.
Vad är en datapipeline?
En datapipeline implementerar de steg som krävs för att flytta data från källsystem, transformera dessa data baserat på krav och lagra data i ett målsystem. En datapipeline innehåller alla processer som krävs för att omvandla rådata till förberedda data som användarna kan använda. En datapipeline kan till exempel förbereda data så att dataanalytiker och dataforskare kan extrahera värde från data via analys och rapportering.
Ett arbetsflöde för extrahering, transformering och inläsning (ETL) är ett vanligt exempel på en datapipeline. Vid ETL-bearbetning matas data in från källsystem och skrivs till ett mellanlagringsområde, transformeras baserat på krav (säkerställa datakvalitet, deduplicera poster och så vidare) och skrivs sedan till ett målsystem, till exempel ett informationslager eller en datasjö.
Steg för datapipeline
För att hjälpa dig att komma igång med att skapa datapipelines i Azure Databricks går exemplet i den här artikeln igenom hur du skapar ett arbetsflöde för databearbetning:
- Använd Azure Databricks-funktioner för att utforska en rådatauppsättning.
- Skapa en Databricks-notebook för att importera rådata och skriva den råa datan till en måltabell.
- Skapa en Databricks-anteckningsbok för att bearbeta rådata och skriva de bearbetade data till en måltabell.
- Skapa en Databricks-anteckningsbok för att köra frågor mot den transformerade datan.
- Automatisera datapipelinen med ett Azure Databricks-jobb.
Krav
- Du är inloggad i Azure Databricks och på arbetsytan för Datavetenskap och Teknik.
- Du har behörighet att skapa en beräkningsresurs eller åtkomst till en beräkningsresurs.
- (Valfritt) Om du vill publicera tabeller till Unity Catalog måste du skapa en katalog och ett schema i Unity Catalog.
Exempel: Million Song-datauppsättning
Datauppsättningen som används i det här exemplet är en delmängd av Datauppsättningen Million Song, en samling funktioner och metadata för samtida musikspår. Den här datamängden är tillgänglig i exempeldatauppsättningarna som ingår i din Azure Databricks-arbetsyta.
steg 1: Skapa en beräkningsresurs
Om du vill utföra databearbetningen och analysen i det här exemplet skapar du en beräkningsresurs för att köra kommandon.
Kommentar
Eftersom det här exemplet använder en exempeldatauppsättning som lagras i DBFS och rekommenderar att du bevarar tabeller för att Unity Catalogskapar du en beräkningsresurs som konfigurerats med dedikerat åtkomstläge. Dedikerat åtkomstläge ger fullständig åtkomst till DBFS samtidigt som åtkomst till Unity Catalog aktiveras. Se Metodtips för DBFS och Unity Catalog.
- Klicka på Beräkna i sidofältet.
- På sidan Beräkning klickar du på Skapa beräkning.
- På den nya beräkningssidan anger du ett unikt namn för beräkningsresursen.
- Under Avanceratväxlar du inställningen för åtkomstläge till Manuell och väljer sedan Dedikerad.
- I Enskild användare eller gruppväljer du ditt användarnamn.
- Lämna de återstående värdena i standardtillståndet och klicka på Skapa.
Mer information om Databricks-beräkningsresurser finns i Compute.
Steg 2: Utforska källdata
Information om hur du använder Azure Databricks-gränssnittet för att utforska rådata finns i Utforska källdata för en datapipeline. Om du vill gå direkt till att mata in och förbereda data fortsätter du till Steg 3: Mata in rådata.
Steg 3: Införa rådata
I det här steget läser du in rådata i en tabell för att göra dem tillgängliga för vidare bearbetning. För att hantera datatillgångar på Databricks-plattformen, till exempel tabeller, rekommenderar Databricks Unity Catalog. Men om du inte har behörighet att skapa den katalog och schema som krävs för att publicera tabeller till Unity Catalog kan du fortfarande utföra följande steg genom att publicera tabeller till Hive-metaarkivet.
Databricks rekommenderar att du använder Auto Loader för att mata in data. Automatisk inläsning identifierar och bearbetar automatiskt nya filer när de tas emot i molnobjektlagring.
Du kan konfigurera automatisk inläsning för att automatiskt identifiera schemat för inlästa data, så att du kan initiera tabeller utan att uttryckligen deklarera dataschemat och utveckla tabellschemat när nya kolumner introduceras. Detta eliminerar behovet av att manuellt spåra och tillämpa schemaändringar över tid. Databricks rekommenderar schemainferens när du använder Auto Loader. Men som du ser i datautforskningssteget innehåller låtdata inte rubrikinformation. Eftersom rubriken inte lagras med data måste du uttryckligen definiera schemat, som du ser i nästa exempel.
I sidofältet klickar du på
Ny och väljer Anteckningsbok på menyn. Dialogrutan Skapa anteckningsbok visas.
Ange ett namn på anteckningsboken, till exempel
Ingest songs data
. Som standard:- Python är det valda språket.
- Anteckningsboken är kopplad till den senaste beräkningsresursen som du använde. I det här fallet handlar det om resursen som du skapade i steg 1: Skapa en beräkningsresurs.
Ange följande i den första cellen i anteckningsboken:
from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField # Define variables used in the code below file_path = "/databricks-datasets/songs/data-001/" table_name = "<table-name>" checkpoint_path = "/tmp/pipeline_get_started/_checkpoint/song_data" schema = StructType( [ StructField("artist_id", StringType(), True), StructField("artist_lat", DoubleType(), True), StructField("artist_long", DoubleType(), True), StructField("artist_location", StringType(), True), StructField("artist_name", StringType(), True), StructField("duration", DoubleType(), True), StructField("end_of_fade_in", DoubleType(), True), StructField("key", IntegerType(), True), StructField("key_confidence", DoubleType(), True), StructField("loudness", DoubleType(), True), StructField("release", StringType(), True), StructField("song_hotnes", DoubleType(), True), StructField("song_id", StringType(), True), StructField("start_of_fade_out", DoubleType(), True), StructField("tempo", DoubleType(), True), StructField("time_signature", DoubleType(), True), StructField("time_signature_confidence", DoubleType(), True), StructField("title", StringType(), True), StructField("year", IntegerType(), True), StructField("partial_sequence", IntegerType(), True) ] ) (spark.readStream .format("cloudFiles") .schema(schema) .option("cloudFiles.format", "csv") .option("sep","\t") .load(file_path) .writeStream .option("checkpointLocation", checkpoint_path) .trigger(availableNow=True) .toTable(table_name) )
Om du använder Unity Catalog ersätter
<table-name>
du med en katalog, ett schema och ett tabellnamn för att innehålla de inmatade posterna (till exempeldata_pipelines.songs_data.raw_song_data
). Annars ersätter du<table-name>
med namnet på en tabell som ska innehålla de inmatade posterna,raw_song_data
till exempel .Ersätt
<checkpoint-path>
med en sökväg till en katalog i DBFS för att underhålla kontrollpunktsfiler,/tmp/pipeline_get_started/_checkpoint/song_data
till exempel .Klicka på
och välj Kör cell. Det här exemplet definierar dataschemat med hjälp av informationen från
README
, matar in låtdata från alla filer som finns ifile_path
och skriver data till tabellen som anges avtable_name
.
Steg 4: Förbereda rådata
För att förbereda rådata för analys transformerar följande steg sångernas rådata genom att filtrera bort onödiga kolumner och lägga till ett nytt fält som innehåller en tidsstämpel för när den nya posten skapas.
I sidofältet klickar du på
Ny och väljer Anteckningsbok på menyn. Dialogrutan Skapa anteckningsbok visas.
Ange ett namn på anteckningsboken. Exempel:
Prepare songs data
Ändra standardspråket till SQL.Ange följande i den första cellen i anteckningsboken:
CREATE OR REPLACE TABLE <table-name> ( artist_id STRING, artist_name STRING, duration DOUBLE, release STRING, tempo DOUBLE, time_signature DOUBLE, title STRING, year DOUBLE, processed_time TIMESTAMP ); INSERT INTO <table-name> SELECT artist_id, artist_name, duration, release, tempo, time_signature, title, year, current_timestamp() FROM <raw-songs-table-name>
Om du använder Unity Catalog ersätter
<table-name>
du med en katalog, ett schema och ett tabellnamn för att innehålla de filtrerade och transformerade posterna (till exempeldata_pipelines.songs_data.prepared_song_data
). Annars ersätter du<table-name>
med namnet på en tabell som ska innehålla de filtrerade och transformerade posterna (till exempelprepared_song_data
).Ersätt
<raw-songs-table-name>
med namnet på tabellen som innehåller de råa låtarna som matades in i föregående steg.Klicka på
och välj Kör cell.
Steg 5: Hämta transformerade data
I det här steget utökar du bearbetningspipelinen genom att lägga till frågor för att analysera låtdata. Dessa frågor använder de förberedda poster som skapades i föregående steg.
I sidofältet klickar du på
Ny och väljer Anteckningsbok på menyn. Dialogrutan Skapa anteckningsbok visas.
Ange ett namn på anteckningsboken. Exempel:
Analyze songs data
Ändra standardspråket till SQL.Ange följande i den första cellen i anteckningsboken:
-- Which artists released the most songs each year? SELECT artist_name, count(artist_name) AS num_songs, year FROM <prepared-songs-table-name> WHERE year > 0 GROUP BY artist_name, year ORDER BY num_songs DESC, year DESC
Ersätt
<prepared-songs-table-name>
med namnet på tabellen som innehåller förberedda data. Exempel:data_pipelines.songs_data.prepared_song_data
Klicka på
menyn cellåtgärder, välj Lägg till cell nedan och ange följande i den nya cellen:
-- Find songs for your DJ list SELECT artist_name, title, tempo FROM <prepared-songs-table-name> WHERE time_signature = 4 AND tempo between 100 and 140;
Ersätt
<prepared-songs-table-name>
med namnet på den förberedda tabellen som skapades i föregående steg. Exempel:data_pipelines.songs_data.prepared_song_data
Om du vill köra frågorna och visa utdata klickar du på Kör alla.
Steg 6: Skapa ett Azure Databricks-jobb för att köra pipelinen
Du kan skapa ett arbetsflöde för att automatisera körningen av datainmatnings-, bearbetnings- och analysstegen med hjälp av ett Azure Databricks-jobb.
- Gör något av följande i din arbetsyta för Datavetenskap & Engineering:
- Klicka på
Arbetsflöden i sidofältet och klicka på .
- I sidofältet klickar du på
Nytt och väljer Jobb.
- Klicka på
- I dialogrutan Aktivitet på fliken Uppgifter ersätter du Lägg till ett namn för jobbet... med jobbets namn. Till exempel "Songs workflow".
- I Uppgiftsnamn anger du ett namn för den första aktiviteten, till exempel
Ingest_songs_data
. - I Typ väljer du aktivitetstypen Notebook .
- I Källa väljer du Arbetsyta.
- I fältet Sökväg använder du filläsaren för att hitta anteckningsboken för datainmatning och klickar sedan på Bekräfta.
- I Computeväljer du den beräkningsresurs som du skapade i
Create a compute resource
steget. - Klicka på Skapa.
- Klicka
under den uppgift som du nyss skapade och välj Anteckningsbok.
- I Uppgiftsnamn anger du ett namn för aktiviteten, till exempel
Prepare_songs_data
. - I Typ väljer du aktivitetstypen Notebook .
- I Källa väljer du Arbetsyta.
- Använd filläsaren för att hitta anteckningsboken för förberedelse av data, klicka på anteckningsbokens namn och klicka på Bekräfta.
- I Computeväljer du den beräkningsresurs som du skapade i
Create a compute resource
steget. - Klicka på Skapa.
- Klicka
under den uppgift som du nyss skapade och välj Anteckningsbok.
- I Uppgiftsnamn anger du ett namn för aktiviteten, till exempel
Analyze_songs_data
. - I Typ väljer du aktivitetstypen Notebook .
- I Källa väljer du Arbetsyta.
- Använd filläsaren för att hitta anteckningsboken för dataanalys, klicka på anteckningsbokens namn och klicka på Bekräfta.
- I Computeväljer du den beräkningsresurs som du skapade i
Create a compute resource
steget. - Klicka på Skapa.
- Om du vill köra arbetsflödet klickar du på
. Om du vill visa information om körningen klickar du på länken i kolumnen Starttid för körningen i jobbkörningsvyn. Klicka på varje aktivitet om du vill visa information om aktivitetskörningen.
- Om du vill visa resultatet när arbetsflödet är klart klickar du på den slutliga dataanalysuppgiften. Sidan Utdata visas och visar frågeresultatet.
Steg 7: Schemalägg datapipelinejobbet
Anteckning
Om du vill demonstrera hur du använder ett Azure Databricks-jobb för att orkestrera ett schemalagt arbetsflöde separerar det här komma igång-exemplet stegen för inmatning, förberedelse och analys i separata notebook-filer, och varje notebook-fil används sedan för att skapa en uppgift i jobbet. Om all bearbetning finns i en enda notebook-fil kan du enkelt schemalägga notebook-filen direkt från användargränssnittet för Azure Databricks Notebook. Se Skapa och hantera schemalagda notebook-jobb.
Ett vanligt krav är att köra en datapipeline enligt ett schema. Så här definierar du ett schema för jobbet som kör pipelinen:
- Klicka på
Arbetsflöden i sidofältet.
- Klicka på jobbnamnet i kolumnen Namn. Sidopanelen visar jobbinformationen.
- Klicka på Lägg till utlösare i panelen Jobbinformation och välj Schemalagd i Utlösartyp.
- Ange period, starttid och tidszon. Du kan också markera kryssrutan Visa Cron-syntax för att visa och redigera schemat i Quartz Cron-syntax.
- Klicka på Spara.
Läs mer
- Mer information om Databricks-notebook-filer finns i Introduktion till Databricks-notebook-filer.
- Mer information om Azure Databricks-jobb finns i Vad är jobb?.
- Mer information om Delta Lake finns i Vad är Delta Lake?.
- Mer information om databehandlingspipelines med DLT finns i Vad är DLT?.