Een end-to-end gegevenspijplijn bouwen in Databricks
In dit artikel leest u hoe u een end-to-end pijplijn voor gegevensverwerking maakt en implementeert, waaronder het opnemen van onbewerkte gegevens, het transformeren van de gegevens en het uitvoeren van analyses op de verwerkte gegevens.
Notitie
Hoewel dit artikel laat zien hoe u een volledige gegevenspijplijn maakt met behulp van Databricks notebooks en een Azure Databricks -taak voor het organiseren van een werkstroom, raadt Databricks aan om Delta Live Tableste gebruiken, een declaratieve interface voor het bouwen van betrouwbare, onderhoudbare en testbare pijplijnen voor gegevensverwerking.
Wat is een gegevenspijplijn?
Een gegevenspijplijn implementeert de stappen die nodig zijn om gegevens te verplaatsen van bronsystemen, die gegevens te transformeren op basis van vereisten en de gegevens op te slaan in een doelsysteem. Een gegevenspijplijn bevat alle processen die nodig zijn om onbewerkte gegevens om te zetten in voorbereide gegevens die gebruikers kunnen gebruiken. Een gegevenspijplijn kan bijvoorbeeld gegevens voorbereiden, zodat gegevensanalisten en gegevenswetenschappers waarde uit de gegevens kunnen extraheren via analyse en rapportage.
Een ETL-werkstroom (extraheren, transformeren en laden) is een veelvoorkomend voorbeeld van een gegevenspijplijn. Bij ETL-verwerking worden gegevens opgenomen uit bronsystemen en naar een faseringsgebied geschreven, getransformeerd op basis van vereisten (gegevenskwaliteit, ontdubbeling van records, enzovoort) en vervolgens naar een doelsysteem zoals een datawarehouse of data lake geschreven.
Stappen voor gegevenspijplijn
Om u te helpen bij het bouwen van gegevenspijplijnen in Azure Databricks, wordt in het voorbeeld in dit artikel uitgelegd hoe u een werkstroom voor gegevensverwerking maakt:
- Gebruik Azure Databricks-functies om een onbewerkte gegevensset te verkennen.
- Maak een Databricks-notebook om onbewerkte brongegevens op te nemen en de onbewerkte gegevens naar een doeltabel te schrijven.
- Maak een Databricks-notebook om de onbewerkte brongegevens te transformeren en de getransformeerde gegevens naar een doeltabel te schrijven.
- Maak een Databricks-notebook om een query uit te voeren op de getransformeerde gegevens.
- Automatiseer de gegevenspijplijn met een Azure Databricks-taak.
Vereisten
- U bent aangemeld bij Azure Databricks en in de werkruimte Datawetenschap & Engineering.
- U bent gemachtigd om een cluster te maken of toegang te krijgen tot een cluster.
- (Optioneel) Als u tabellen wilt publiceren naar Unity Catalog, moet u een catalogus maken en schema in Unity Catalog.
Voorbeeld: Miljoen nummergegevensset
De gegevensset die in dit voorbeeld wordt gebruikt, is een subset van de Million Song Dataset, een verzameling functies en metagegevens voor hedendaagse muzieknummers. Deze gegevensset is beschikbaar in de voorbeeldgegevenssets die zijn opgenomen in uw Azure Databricks-werkruimte.
Stap 1: Een cluster maken
Als u de gegevensverwerking en -analyse in dit voorbeeld wilt uitvoeren, maakt u een cluster om de rekenresources op te geven die nodig zijn om opdrachten uit te voeren.
Notitie
Omdat in dit voorbeeld een voorbeeldgegevensset wordt gebruikt die is opgeslagen in DBFS en wordt aanbevolen om permanente tabellen te Unity Catalog, maakt u een cluster dat is geconfigureerd met -modus voor toegang tot één gebruiker. De toegangsmodus voor één gebruiker biedt volledige toegang tot DBFS en maakt ook toegang tot Unity Catalog mogelijk. Zie Aanbevolen procedures voor DBFS en Unity Catalog.
- Klik op Compute in de zijbalk.
- Klik op de pagina Compute op Cluster maken.
- Voer op de pagina Nieuw cluster een unieke naam in voor het cluster.
- Selecteer in Access-modusvoor één gebruiker.
- In Enkelvoudige gebruiker of service-principal toegang, selecteer uw gebruikersnaam.
- Laat de resterende waarden in de standaardstatus staan en klik op Cluster maken.
Zie Compute voor meer informatie over Databricks-clusters.
Stap 2: De brongegevens verkennen
Zie De brongegevens voor een gegevenspijplijn verkennen voor meer informatie over het gebruik van de Azure Databricks-interface om de onbewerkte brongegevens te verkennen. Als u rechtstreeks naar de opname en voorbereiding van de gegevens wilt gaan, gaat u verder met stap 3: de onbewerkte gegevens opnemen.
Stap 3: de onbewerkte gegevens opnemen
In deze stap laadt u de onbewerkte gegevens in een tabel om deze beschikbaar te maken voor verdere verwerking. Voor het beheren van gegevensassets op het Databricks-platform, zoals tabellen, raadt Databricks Unity Catalog aan. Als u echter geen machtigingen hebt om de vereiste catalogus en het schema te maken voor het publiceren van tabellen naar Unity Catalog, kunt u nog steeds de volgende stappen uitvoeren door tabellen te publiceren naar de Hive-metastore.
Databricks raadt aan om gegevens op te nemen. Automatisch laden detecteert en verwerkt automatisch nieuwe bestanden wanneer ze binnenkomen in de opslag van cloudobjecten.
U kunt automatisch laden configureren om het schema van geladen gegevens automatisch te detecteren, zodat u tabellen kunt initialiseren zonder expliciet het gegevensschema te declareren en het tabelschema te ontwikkelen naarmate er nieuwe kolommen worden geïntroduceerd. Hierdoor hoeft u geen schemawijzigingen handmatig bij te houden en toe te passen in de loop van de tijd. Databricks raadt schemadeductie aan bij het gebruik van automatische laadprogramma's. Zoals u echter in de stap voor gegevensverkenning ziet, bevatten de nummers geen headergegevens. Omdat de header niet is opgeslagen met de gegevens, moet u het schema expliciet definiëren, zoals wordt weergegeven in het volgende voorbeeld.
Klik in de zijbalk op
Nieuw en selecteer Notitieblok in het menu. Het dialoogvenster Notitieblok maken wordt weergegeven.
Voer bijvoorbeeld een naam in voor het notitieblok
Ingest songs data
. Standaard:- Python is de geselecteerde taal.
- Het notebook is gekoppeld aan het laatste cluster dat u hebt gebruikt. In dit geval is het cluster dat u in stap 1 hebt gemaakt: een cluster maken.
Voer het volgende in de eerste cel van het notebook in:
from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField # Define variables used in the code below file_path = "/databricks-datasets/songs/data-001/" table_name = "<table-name>" checkpoint_path = "/tmp/pipeline_get_started/_checkpoint/song_data" schema = StructType( [ StructField("artist_id", StringType(), True), StructField("artist_lat", DoubleType(), True), StructField("artist_long", DoubleType(), True), StructField("artist_location", StringType(), True), StructField("artist_name", StringType(), True), StructField("duration", DoubleType(), True), StructField("end_of_fade_in", DoubleType(), True), StructField("key", IntegerType(), True), StructField("key_confidence", DoubleType(), True), StructField("loudness", DoubleType(), True), StructField("release", StringType(), True), StructField("song_hotnes", DoubleType(), True), StructField("song_id", StringType(), True), StructField("start_of_fade_out", DoubleType(), True), StructField("tempo", DoubleType(), True), StructField("time_signature", DoubleType(), True), StructField("time_signature_confidence", DoubleType(), True), StructField("title", StringType(), True), StructField("year", IntegerType(), True), StructField("partial_sequence", IntegerType(), True) ] ) (spark.readStream .format("cloudFiles") .schema(schema) .option("cloudFiles.format", "csv") .option("sep","\t") .load(file_path) .writeStream .option("checkpointLocation", checkpoint_path) .trigger(availableNow=True) .toTable(table_name) )
Als u Unity Catalog gebruikt, vervangt u
<table-name>
door een catalogus, schema en tabelnaam om de opgenomen records (bijvoorbeelddata_pipelines.songs_data.raw_song_data
) te bevatten. Vervang anders<table-name>
door de naam van een tabel die de opgenomen records bevat, bijvoorbeeldraw_song_data
.Vervang
<checkpoint-path>
door een pad naar een map in DBFS om bijvoorbeeld/tmp/pipeline_get_started/_checkpoint/song_data
controlepuntbestanden te onderhouden.Klik op
en selecteer Uitvoeren Cel. In dit voorbeeld wordt het gegevensschema gedefinieerd met behulp van de gegevens uit de
README
, worden de nummers opgenomen uit alle bestanden infile_path
en worden de gegevens weggeschreven naar de tabel die is opgegeven doortable_name
.
Stap 4: De onbewerkte gegevens voorbereiden
Om de onbewerkte gegevens voor te bereiden op analyse, transformeren de volgende stappen de onbewerkte liedgegevens door overbodige kolommen te filteren en een nieuw veld toe te voegen dat een tijdstempel bevat voor het maken van het nieuwe record.
Klik in de zijbalk op
Nieuw en selecteer Notitieblok in het menu. Het dialoogvenster Notitieblok maken wordt weergegeven.
Voer een naam in voor het notitieblok. Bijvoorbeeld:
Prepare songs data
. Wijzig de standaardtaal in SQL.Voer het volgende in de eerste cel van het notebook in:
CREATE OR REPLACE TABLE <table-name> ( artist_id STRING, artist_name STRING, duration DOUBLE, release STRING, tempo DOUBLE, time_signature DOUBLE, title STRING, year DOUBLE, processed_time TIMESTAMP ); INSERT INTO <table-name> SELECT artist_id, artist_name, duration, release, tempo, time_signature, title, year, current_timestamp() FROM <raw-songs-table-name>
Als u Unity Catalog gebruikt, vervangt u
<table-name>
door een catalogus, schema en tabelnaam om de gefilterde en getransformeerde records (bijvoorbeelddata_pipelines.songs_data.prepared_song_data
) te bevatten. Vervang anders<table-name>
door de naam van een tabel die de gefilterde en getransformeerde records bevat (bijvoorbeeldprepared_song_data
).Vervang
<raw-songs-table-name>
door de naam van de tabel met de onbewerkte nummers die zijn opgenomen in de vorige stap.Klik op
en selecteer Uitvoeren Cel.
Stap 5: Een query uitvoeren op de getransformeerde gegevens
In deze stap breidt u de verwerkingspijplijn uit door query's toe te voegen om de nummersgegevens te analyseren. Deze query's maken gebruik van de voorbereide records die in de vorige stap zijn gemaakt.
Klik in de zijbalk op
Nieuw en selecteer Notitieblok in het menu. Het dialoogvenster Notitieblok maken wordt weergegeven.
Voer een naam in voor het notitieblok. Bijvoorbeeld:
Analyze songs data
. Wijzig de standaardtaal in SQL.Voer het volgende in de eerste cel van het notebook in:
-- Which artists released the most songs each year? SELECT artist_name, count(artist_name) AS num_songs, year FROM <prepared-songs-table-name> WHERE year > 0 GROUP BY artist_name, year ORDER BY num_songs DESC, year DESC
Vervang
<prepared-songs-table-name>
door de naam van de tabel met voorbereide gegevens. Bijvoorbeeld:data_pipelines.songs_data.prepared_song_data
.Klik op
omlaag in het menu celacties, selecteer Cel onder toevoegen en voer het volgende in de nieuwe cel in:
-- Find songs for your DJ list SELECT artist_name, title, tempo FROM <prepared-songs-table-name> WHERE time_signature = 4 AND tempo between 100 and 140;
Vervang
<prepared-songs-table-name>
door de naam van de voorbereide tabel die u in de vorige stap hebt gemaakt. Bijvoorbeeld:data_pipelines.songs_data.prepared_song_data
.Als u de query's wilt uitvoeren en de uitvoer wilt weergeven, klikt u op Alles uitvoeren.
Stap 6: Een Azure Databricks-taak maken om de pijplijn uit te voeren
U kunt een werkstroom maken om het uitvoeren van de stappen voor gegevensopname, verwerking en analyse te automatiseren met behulp van een Azure Databricks-taak.
- Voer in uw Datawetenschap & Engineering-werkruimte een van de volgende handelingen uit:
- Klik op
Werkstromen in de zijbalk en klik op .
- Klik in de zijbalk op
Nieuwe en selecteer Taak.
- Klik op
- Vervang in het taakdialoogvenster op het tabblad Taken een naam voor uw taak toevoegen... door uw taaknaam. Bijvoorbeeld 'Werkstroom Nummers'.
-
Voer in De naam van de taak een naam in voor de eerste taak, bijvoorbeeld
Ingest_songs_data
. - Selecteer in Typehet taaktype Notebook.
- Selecteer in bronde werkruimte.
- Gebruik de bestandsbrowser om het notitieblok voor gegevensopname te vinden, klik op de naam van het notitieblok en klik op Bevestigen.
- Selecteer in ClusterShared_job_cluster of het cluster dat u in de
Create a cluster
stap hebt gemaakt. - Klik op Create.
- Klik op de knop
onder de taak die u zojuist hebt gemaakt en selecteer Notitieblok.
-
Voer in De naam van de taak een naam in voor de taak, bijvoorbeeld
Prepare_songs_data
. - Selecteer in Typehet taaktype Notebook.
- Selecteer in bronde werkruimte.
- Gebruik de bestandsbrowser om het notitieblok voor gegevensvoorbereiding te vinden, klik op de naam van het notitieblok en klik op Bevestigen.
- Selecteer in ClusterShared_job_cluster of het cluster dat u in de
Create a cluster
stap hebt gemaakt. - Klik op Create.
- Klik op de knop
onder de taak die u zojuist hebt gemaakt en selecteer Notitieblok.
-
Voer in De naam van de taak een naam in voor de taak, bijvoorbeeld
Analyze_songs_data
. - Selecteer in Typehet taaktype Notebook.
- Selecteer in bronde werkruimte.
- Gebruik de bestandsbrowser om het notitieblok voor gegevensanalyse te vinden, klik op de naam van het notitieblok en klik op Bevestigen.
- Selecteer in ClusterShared_job_cluster of het cluster dat u in de
Create a cluster
stap hebt gemaakt. - Klik op Create.
- Als u de werkstroom wilt uitvoeren, klikt u op
. Als u de details van voor de uitvoeringwilt bekijken, klikt u op de link in de kolom begintijd voor de uitvoering in de weergave van de taakuitvoeringen. Klik op elke taak om de details voor de taakuitvoering weer te geven.
- Als u de resultaten wilt weergeven wanneer de werkstroom is voltooid, klikt u op de uiteindelijke taak voor gegevensanalyse. De uitvoerpagina wordt weergegeven en de queryresultaten worden weergegeven.
Stap 7: De gegevenspijplijntaak plannen
Notitie
Als u wilt laten zien hoe u een Azure Databricks-taak gebruikt om een geplande werkstroom te organiseren, scheidt dit aan de slag-voorbeeld de stappen voor opname, voorbereiding en analyse in afzonderlijke notebooks. Vervolgens wordt elke notebook gebruikt om een taak in de taak te maken. Als alle verwerkingen zijn opgenomen in één notebook, kunt u het notebook eenvoudig rechtstreeks vanuit de gebruikersinterface van azure Databricks-notebook plannen. Zie Geplande notebooktaken maken en beheren.
Een veelvoorkomende vereiste is om een gegevenspijplijn op geplande basis uit te voeren. Een planning definiëren voor de taak die de pijplijn uitvoert:
- Klik op
Werkstromen in de zijbalk.
- Klik in de kolom Naam op de taaknaam. In het zijpaneel worden de taakgegevens weergegeven.
- Klik op Trigger toevoegen in het deelvenster Taakdetails en selecteer Gepland in Triggertype.
- Geef de periode, begintijd en tijdzone op. Schakel desgewenst het selectievakje Cron-syntaxis weergeven in om het schema weer te geven en te bewerken in Kwarts cron syntaxis.
- Klik op Opslaan.
Meer informatie
- Zie Inleiding tot Databricks-notebooks voor meer informatie over Databricks-notebooks.
- Zie Wat zijn taken voor meer informatie over Azure Databricks-taken?.
- Zie Wat is Delta Lake?voor meer informatie over Delta Lake.
- Zie Wat is Delta Live Tables voor meer informatie over pijplijnen voor gegevensverwerking met Delta Live Tables?.