Delen via


Een end-to-end gegevenspijplijn bouwen in Databricks

In dit artikel leest u hoe u een end-to-end pijplijn voor gegevensverwerking maakt en implementeert, waaronder het opnemen van onbewerkte gegevens, het transformeren van de gegevens en het uitvoeren van analyses op de verwerkte gegevens.

Notitie

Hoewel dit artikel laat zien hoe u een volledige gegevenspijplijn maakt met behulp van Databricks-notebooks en een Azure Databricks-taak voor het organiseren van een werkstroom, raadt Databricks aan om Delta Live Tables te gebruiken, een declaratieve interface voor het bouwen van betrouwbare, onderhoudbare en testbare pijplijnen voor gegevensverwerking.

Wat is een gegevenspijplijn?

Een gegevenspijplijn implementeert de stappen die nodig zijn om gegevens te verplaatsen van bronsystemen, die gegevens te transformeren op basis van vereisten en de gegevens op te slaan in een doelsysteem. Een gegevenspijplijn bevat alle processen die nodig zijn om onbewerkte gegevens om te zetten in voorbereide gegevens die gebruikers kunnen gebruiken. Een gegevenspijplijn kan bijvoorbeeld gegevens voorbereiden, zodat gegevensanalisten en gegevenswetenschappers waarde uit de gegevens kunnen extraheren via analyse en rapportage.

Een ETL-werkstroom (extraheren, transformeren en laden) is een veelvoorkomend voorbeeld van een gegevenspijplijn. Bij ETL-verwerking worden gegevens opgenomen uit bronsystemen en naar een faseringsgebied geschreven, getransformeerd op basis van vereisten (gegevenskwaliteit, ontdubbeling van records, enzovoort) en vervolgens naar een doelsysteem zoals een datawarehouse of data lake geschreven.

Stappen voor gegevenspijplijn

Om u te helpen bij het bouwen van gegevenspijplijnen in Azure Databricks, wordt in het voorbeeld in dit artikel uitgelegd hoe u een werkstroom voor gegevensverwerking maakt:

  • Gebruik Azure Databricks-functies om een onbewerkte gegevensset te verkennen.
  • Maak een Databricks-notebook om onbewerkte brongegevens op te nemen en de onbewerkte gegevens naar een doeltabel te schrijven.
  • Maak een Databricks-notebook om de onbewerkte brongegevens te transformeren en de getransformeerde gegevens naar een doeltabel te schrijven.
  • Maak een Databricks-notebook om een query uit te voeren op de getransformeerde gegevens.
  • Automatiseer de gegevenspijplijn met een Azure Databricks-taak.

Vereisten

Voorbeeld: Miljoen nummergegevensset

De gegevensset die in dit voorbeeld wordt gebruikt, is een subset van de Million Song Dataset, een verzameling functies en metagegevens voor hedendaagse muzieknummers. Deze gegevensset is beschikbaar in de voorbeeldgegevenssets die zijn opgenomen in uw Azure Databricks-werkruimte.

Stap 1: Een cluster maken

Als u de gegevensverwerking en -analyse in dit voorbeeld wilt uitvoeren, maakt u een cluster om de rekenresources op te geven die nodig zijn om opdrachten uit te voeren.

Notitie

Omdat in dit voorbeeld een voorbeeldgegevensset wordt gebruikt die is opgeslagen in DBFS en wordt aanbevolen om tabellen in Unity Catalog te behouden, maakt u een cluster dat is geconfigureerd met de modus voor toegang tot één gebruiker. De toegangsmodus voor één gebruiker biedt volledige toegang tot DBFS en maakt ook toegang tot Unity Catalog mogelijk. Zie aanbevolen procedures voor DBFS en Unity Catalog.

  1. Klik op Compute in de zijbalk.
  2. Klik op de pagina Compute op Cluster maken.
  3. Voer op de pagina Nieuw cluster een unieke naam in voor het cluster.
  4. Selecteer in de toegangsmodus de optie Eén gebruiker.
  5. Selecteer uw gebruikersnaam in toegang tot één gebruiker of service-principal.
  6. Laat de resterende waarden in de standaardstatus staan en klik op Cluster maken.

Zie Compute voor meer informatie over Databricks-clusters.

Stap 2: De brongegevens verkennen

Zie De brongegevens voor een gegevenspijplijn verkennen voor meer informatie over het gebruik van de Azure Databricks-interface om de onbewerkte brongegevens te verkennen. Als u rechtstreeks naar de opname en voorbereiding van de gegevens wilt gaan, gaat u verder met stap 3: de onbewerkte gegevens opnemen.

Stap 3: de onbewerkte gegevens opnemen

In deze stap laadt u de onbewerkte gegevens in een tabel om deze beschikbaar te maken voor verdere verwerking. Databricks raadt Unity Catalog aan om gegevensassets te beheren op het Databricks-platform, zoals tabellen. Als u echter geen machtigingen hebt om de vereiste catalogus en het schema te maken voor het publiceren van tabellen naar Unity Catalog, kunt u nog steeds de volgende stappen uitvoeren door tabellen te publiceren naar de Hive-metastore.

Databricks raadt aan om gegevens op te nemen. Automatisch laden detecteert en verwerkt automatisch nieuwe bestanden wanneer ze binnenkomen in de opslag van cloudobjecten.

U kunt automatisch laden configureren om het schema van geladen gegevens automatisch te detecteren, zodat u tabellen kunt initialiseren zonder expliciet het gegevensschema te declareren en het tabelschema te ontwikkelen naarmate er nieuwe kolommen worden geïntroduceerd. Hierdoor hoeft u geen schemawijzigingen handmatig bij te houden en toe te passen in de loop van de tijd. Databricks raadt schemadeductie aan bij het gebruik van automatische laadprogramma's. Zoals u echter in de stap voor gegevensverkenning ziet, bevatten de nummers geen headergegevens. Omdat de header niet is opgeslagen met de gegevens, moet u het schema expliciet definiëren, zoals wordt weergegeven in het volgende voorbeeld.

  1. Klik in de zijbalk op Nieuw pictogram Nieuw en selecteer Notitieblok in het menu. Het dialoogvenster Notitieblok maken wordt weergegeven.

  2. Voer bijvoorbeeld een naam in voor het notitieblok Ingest songs data. Standaard:

    • Python is de geselecteerde taal.
    • Het notebook is gekoppeld aan het laatste cluster dat u hebt gebruikt. In dit geval is het cluster dat u in stap 1 hebt gemaakt: een cluster maken.
  3. Voer het volgende in de eerste cel van het notebook in:

    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define variables used in the code below
    file_path = "/databricks-datasets/songs/data-001/"
    table_name = "<table-name>"
    checkpoint_path = "/tmp/pipeline_get_started/_checkpoint/song_data"
    
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    (spark.readStream
      .format("cloudFiles")
      .schema(schema)
      .option("cloudFiles.format", "csv")
      .option("sep","\t")
      .load(file_path)
      .writeStream
      .option("checkpointLocation", checkpoint_path)
      .trigger(availableNow=True)
      .toTable(table_name)
    )
    

    Als u Unity Catalog gebruikt, vervangt u deze door <table-name> een catalogus, schema en tabelnaam om de opgenomen records (bijvoorbeeld data_pipelines.songs_data.raw_song_data). <table-name> Vervang anders de naam van een tabel die de opgenomen records bevat, raw_song_databijvoorbeeld.

    Vervang <checkpoint-path> door een pad naar een map in DBFS om bijvoorbeeld /tmp/pipeline_get_started/_checkpoint/song_datacontrolepuntbestanden te onderhouden.

  4. Klik op Menu Uitvoerenen selecteer Cel uitvoeren. In dit voorbeeld wordt het gegevensschema gedefinieerd met behulp van de informatie uit de README, worden de nummersgegevens opgenomen uit alle bestanden in file_pathen worden de gegevens naar de tabel geschreven die door table_name.

Stap 4: De onbewerkte gegevens voorbereiden

Om de onbewerkte gegevens voor te bereiden voor analyse, transformeren de volgende stappen de onbewerkte nummersgegevens door overbodige kolommen te filteren en een nieuw veld toe te voegen dat een tijdstempel bevat voor het maken van de nieuwe record.

  1. Klik in de zijbalk op Nieuw pictogram Nieuw en selecteer Notitieblok in het menu. Het dialoogvenster Notitieblok maken wordt weergegeven.

  2. Voer een naam in voor het notitieblok. Bijvoorbeeld: Prepare songs data. Wijzig de standaardtaal in SQL.

  3. Voer het volgende in de eerste cel van het notebook in:

    CREATE OR REPLACE TABLE
      <table-name> (
        artist_id STRING,
        artist_name STRING,
        duration DOUBLE,
        release STRING,
        tempo DOUBLE,
        time_signature DOUBLE,
        title STRING,
        year DOUBLE,
        processed_time TIMESTAMP
      );
    
    INSERT INTO
      <table-name>
    SELECT
      artist_id,
      artist_name,
      duration,
      release,
      tempo,
      time_signature,
      title,
      year,
      current_timestamp()
    FROM
      <raw-songs-table-name>
    

    Als u Unity Catalog gebruikt, vervangt u deze door <table-name> een catalogus, schema en tabelnaam om de gefilterde en getransformeerde records (bijvoorbeeld data_pipelines.songs_data.prepared_song_data). <table-name> Vervang anders de naam van een tabel die de gefilterde en getransformeerde records bevat (bijvoorbeeld prepared_song_data).

    Vervang <raw-songs-table-name> door de naam van de tabel met de onbewerkte nummersrecords die in de vorige stap zijn opgenomen.

  4. Klik op Menu Uitvoerenen selecteer Cel uitvoeren.

Stap 5: Een query uitvoeren op de getransformeerde gegevens

In deze stap breidt u de verwerkingspijplijn uit door query's toe te voegen om de nummersgegevens te analyseren. Deze query's maken gebruik van de voorbereide records die in de vorige stap zijn gemaakt.

  1. Klik in de zijbalk op Nieuw pictogram Nieuw en selecteer Notitieblok in het menu. Het dialoogvenster Notitieblok maken wordt weergegeven.

  2. Voer een naam in voor het notitieblok. Bijvoorbeeld: Analyze songs data. Wijzig de standaardtaal in SQL.

  3. Voer het volgende in de eerste cel van het notebook in:

    -- Which artists released the most songs each year?
    SELECT
      artist_name,
      count(artist_name)
    AS
      num_songs,
      year
    FROM
      <prepared-songs-table-name>
    WHERE
      year > 0
    GROUP BY
      artist_name,
      year
    ORDER BY
      num_songs DESC,
      year DESC
    

    Vervang door <prepared-songs-table-name> de naam van de tabel met voorbereide gegevens. Bijvoorbeeld: data_pipelines.songs_data.prepared_song_data.

  4. Klik Omlaag caret in het menu celacties, selecteer Cel onder toevoegen en voer het volgende in de nieuwe cel in:

     -- Find songs for your DJ list
     SELECT
       artist_name,
       title,
       tempo
     FROM
       <prepared-songs-table-name>
     WHERE
       time_signature = 4
       AND
       tempo between 100 and 140;
    

    Vervang door <prepared-songs-table-name> de naam van de voorbereide tabel die u in de vorige stap hebt gemaakt. Bijvoorbeeld: data_pipelines.songs_data.prepared_song_data.

  5. Als u de query's wilt uitvoeren en de uitvoer wilt weergeven, klikt u op Alles uitvoeren.

Stap 6: Een Azure Databricks-taak maken om de pijplijn uit te voeren

U kunt een werkstroom maken om het uitvoeren van de stappen voor gegevensopname, verwerking en analyse te automatiseren met behulp van een Azure Databricks-taak.

  1. Voer in uw Datawetenschap & Engineering-werkruimte een van de volgende handelingen uit:
    • Klik op Pictogram WerkstromenWerkstromen in de zijbalk en klik op .Knop Taak maken
    • Klik in de zijbalk op Nieuw pictogram Nieuw en selecteer Taak.
  2. Vervang in het taakdialoogvenster op het tabblad Taken een naam voor uw taak toevoegen... door uw taaknaam. Bijvoorbeeld 'Werkstroom Nummers'.
  3. Voer in De naam van de taak een naam in voor de eerste taak, bijvoorbeeldIngest_songs_data.
  4. Selecteer in Type het taaktype Notebook .
  5. Selecteer Werkruimte in Bron.
  6. Gebruik de bestandsbrowser om het notitieblok voor gegevensopname te vinden, klik op de naam van het notitieblok en klik op Bevestigen.
  7. Selecteer in Cluster Shared_job_cluster of het cluster dat u in de Create a cluster stap hebt gemaakt.
  8. Klik op Create.
  9. Klik Knop Taak toevoegen onder de taak die u zojuist hebt gemaakt en selecteer Notitieblok.
  10. Voer in De naam van de taak een naam in voor de taak, bijvoorbeeldPrepare_songs_data.
  11. Selecteer in Type het taaktype Notebook .
  12. Selecteer Werkruimte in Bron.
  13. Gebruik de bestandsbrowser om het notitieblok voor gegevensvoorbereiding te vinden, klik op de naam van het notitieblok en klik op Bevestigen.
  14. Selecteer in Cluster Shared_job_cluster of het cluster dat u in de Create a cluster stap hebt gemaakt.
  15. Klik op Create.
  16. Klik Knop Taak toevoegen onder de taak die u zojuist hebt gemaakt en selecteer Notitieblok.
  17. Voer in De naam van de taak een naam in voor de taak, bijvoorbeeldAnalyze_songs_data.
  18. Selecteer in Type het taaktype Notebook .
  19. Selecteer Werkruimte in Bron.
  20. Gebruik de bestandsbrowser om het notitieblok voor gegevensanalyse te vinden, klik op de naam van het notitieblok en klik op Bevestigen.
  21. Selecteer in Cluster Shared_job_cluster of het cluster dat u in de Create a cluster stap hebt gemaakt.
  22. Klik op Create.
  23. Als u de werkstroom wilt uitvoeren, klikt u op Knop Nu uitvoeren. Als u details voor de uitvoering wilt weergeven, klikt u op de koppeling in de kolom Begintijd voor de uitvoering in de weergave taakuitvoeringen. Klik op elke taak om de details voor de taakuitvoering weer te geven.
  24. Als u de resultaten wilt weergeven wanneer de werkstroom is voltooid, klikt u op de uiteindelijke taak voor gegevensanalyse. De uitvoerpagina wordt weergegeven en de queryresultaten worden weergegeven.

Stap 7: De gegevenspijplijntaak plannen

Notitie

Als u wilt laten zien hoe u een Azure Databricks-taak gebruikt om een geplande werkstroom te organiseren, scheidt dit aan de slag-voorbeeld de stappen voor opname, voorbereiding en analyse in afzonderlijke notebooks. Vervolgens wordt elke notebook gebruikt om een taak in de taak te maken. Als alle verwerkingen zijn opgenomen in één notebook, kunt u het notebook eenvoudig rechtstreeks vanuit de gebruikersinterface van azure Databricks-notebook plannen. Zie Geplande notebooktaken maken en beheren.

Een veelvoorkomende vereiste is om een gegevenspijplijn op geplande basis uit te voeren. Een planning definiëren voor de taak die de pijplijn uitvoert:

  1. Klik op Pictogram Werkstromen Werkstromen in de zijbalk.
  2. Klik in de kolom Naam op de taaknaam. In het zijpaneel worden de taakgegevens weergegeven.
  3. Klik op Trigger toevoegen in het deelvenster Taakdetails en selecteer Gepland in triggertype.
  4. Geef de periode, begintijd en tijdzone op. Schakel desgewenst het selectievakje Cron-syntaxis weergeven in om het schema weer te geven en te bewerken in kwarts cron syntaxis.
  5. Klik op Opslaan.

Meer informatie