Uw eerste ETL-workload uitvoeren in Azure Databricks
Meer informatie over het gebruik van hulpprogramma's die gereed zijn voor productie van Azure Databricks voor het ontwikkelen en implementeren van uw eerste ETL-pijplijnen (extract, transform and load) voor gegevensindeling.
Aan het einde van dit artikel voelt u zich comfortabel:
- Een Databricks-rekencluster voor alle doeleinden starten.
- Een Databricks-notebook maken.
- Incrementele gegevensopname configureren voor Delta Lake met automatische laadprogramma's.
- Notebookcellen uitvoeren om gegevens te verwerken, op te vragen en te bekijken.
- Een notebook plannen als een Databricks-taak.
In deze zelfstudie worden interactieve notebooks gebruikt om algemene ETL-taken in Python of Scala te voltooien.
U kunt ook Delta Live Tables gebruiken om ETL-pijplijnen te bouwen. Databricks heeft Delta Live Tables gemaakt om de complexiteit van het bouwen, implementeren en onderhouden van ETL-pijplijnen voor productie te verminderen. Zie zelfstudie: Uw eerste Delta Live Tables-pijplijn uitvoeren.
U kunt ook de Databricks Terraform-provider gebruiken om de resources van dit artikel te maken. Zie Clusters, notebooks en taken maken met Terraform.
Vereisten
- U bent aangemeld bij een Azure Databricks-werkruimte.
- U bent gemachtigd om een cluster te maken.
Notitie
Als u geen bevoegdheden voor clusterbeheer hebt, kunt u de meeste van de onderstaande stappen nog steeds voltooien zolang u toegang hebt tot een cluster.
Stap 1: Een cluster maken
Als u verkennende gegevensanalyse en data engineering wilt uitvoeren, maakt u een cluster om de rekenresources te bieden die nodig zijn om opdrachten uit te voeren.
- Klik op Compute in de zijbalk.
- Klik op de pagina Compute op Cluster maken. Hiermee opent u de pagina Nieuw cluster.
- Geef een unieke naam op voor het cluster, laat de resterende waarden in de standaardstatus staan en klik op Cluster maken.
Zie Compute voor meer informatie over Databricks-clusters.
Stap 2: Een Databricks-notebook maken
Als u een notitieblok in uw werkruimte wilt maken, klikt u op Nieuw in de zijbalk en vervolgens op Notitieblok. Er wordt een leeg notitieblok geopend in de werkruimte.
Zie Notitieblokken beheren voor meer informatie over het maken en beheren van notitieblokken.
Stap 3: Automatische laadprogramma configureren voor het opnemen van gegevens naar Delta Lake
Databricks raadt het gebruik van automatische laadprogramma's aan voor incrementele gegevensopname. Automatisch laden detecteert en verwerkt automatisch nieuwe bestanden wanneer ze binnenkomen in de opslag van cloudobjecten.
Databricks raadt aan om gegevens op te slaan met Delta Lake. Delta Lake is een opensource-opslaglaag die ACID-transacties biedt en die data lakehouse mogelijk maakt. Delta Lake is de standaardindeling voor tabellen die zijn gemaakt in Databricks.
Als u automatisch laden wilt configureren voor het opnemen van gegevens naar een Delta Lake-tabel, kopieert en plakt u de volgende code in de lege cel in uw notebook:
Python
# Import functions
from pyspark.sql.functions import col, current_timestamp
# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"
# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)
# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.toTable(table_name))
Scala
// Imports
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming.Trigger
import spark.implicits._
// Define variables used in code below
val file_path = "/databricks-datasets/structured-streaming/events"
val username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first.get(0)
val table_name = s"${username}_etl_quickstart"
val checkpoint_path = s"/tmp/${username}/_checkpoint"
// Clear out data from previous demo execution
spark.sql(s"DROP TABLE IF EXISTS ${table_name}")
dbutils.fs.rm(checkpoint_path, true)
// Configure Auto Loader to ingest JSON data to a Delta table
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select($"*", $"_metadata.file_path".as("source_file"), current_timestamp.as("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(Trigger.AvailableNow)
.toTable(table_name)
Notitie
Met de variabelen die in deze code zijn gedefinieerd, kunt u deze veilig uitvoeren zonder dat er een conflict bestaat met bestaande werkruimteactiva of andere gebruikers. Beperkte netwerk- of opslagmachtigingen veroorzaken fouten bij het uitvoeren van deze code; neem contact op met uw werkruimtebeheerder om problemen met deze beperkingen op te lossen.
Zie Wat is Automatisch laadprogramma?voor meer informatie over Automatisch laden.
Stap 4: Gegevens verwerken en ermee werken
Notebooks voeren logische cellen per cel uit. De logica in de cel uitvoeren:
Als u de cel wilt uitvoeren die u in de vorige stap hebt voltooid, selecteert u de cel en drukt u op Shift+Enter.
Als u een query wilt uitvoeren op de tabel die u zojuist hebt gemaakt, kopieert en plakt u de volgende code in een lege cel en drukt u op Shift+Enter om de cel uit te voeren.
Python
df = spark.read.table(table_name)
Scala
val df = spark.read.table(table_name)
Als u een voorbeeld van de gegevens in uw DataFrame wilt bekijken, kopieert en plakt u de volgende code in een lege cel en drukt u op Shift+Enter om de cel uit te voeren.
Python
display(df)
Scala
display(df)
Zie Visualisaties in Databricks-notebooks voor meer informatie over interactieve opties voor het visualiseren van gegevens.
Stap 5: Een taak plannen
U kunt Databricks-notebooks uitvoeren als productiescripts door ze toe te voegen als een taak in een Databricks-taak. In deze stap maakt u een nieuwe taak die u handmatig kunt activeren.
Uw notitieblok als taak plannen:
- Klik aan de rechterkant van de koptekstbalk op Planning .
- Voer een unieke naam in voor de taaknaam.
- Klik op Handmatig.
- Selecteer in de vervolgkeuzelijst Cluster het cluster dat u in stap 1 hebt gemaakt.
- Klik op Create.
- Klik in het venster dat wordt weergegeven op Nu uitvoeren.
- Als u de resultaten van de taakuitvoering wilt zien, klikt u op het pictogram naast de tijdstempel van de laatste uitvoering .
Zie Wat zijn Databricks-taken? voor meer informatie over taken.
Aanvullende integraties
Meer informatie over integraties en hulpprogramma's voor data engineering met Azure Databricks: