Udostępnij za pośrednictwem


Tworzenie kompleksowego potoku danych w usłudze Databricks

W tym artykule przedstawiono, jak stworzyć i wdrożyć kompleksowy potok przetwarzania danych, w tym jak pozyskiwać dane surowe, przekształcać dane i przeprowadzać analizy na przetworzonych danych.

Uwaga

Mimo że w tym artykule pokazano, jak utworzyć kompletny potok danych przy użyciu notatników usługi Databricks i zadań usługi Azure Databricks do organizowania przepływu pracy, usługa Databricks zaleca używanie DLT, interfejsu deklaratywnego do tworzenia niezawodnych, łatwych w utrzymaniu i testowalnych potoków przetwarzania danych.

Co to jest potok danych?

Potok danych implementuje kroki wymagane do przenoszenia danych z systemów źródłowych, przekształcania tych danych na podstawie wymagań i przechowywania danych w systemie docelowym. Potok danych obejmuje wszystkie procesy niezbędne do przekształcenia danych pierwotnych w przygotowane dane, które użytkownicy mogą wykorzystywać. Na przykład potok danych może przygotować dane, aby analitycy danych i naukowcy danych mogli wyodrębnić wartość z danych poprzez analizę i raportowanie.

Przepływ pracy wyodrębniania, przekształcania i ładowania (ETL) jest typowym przykładem strumienia danych. W przypadku przetwarzania ETL dane są pozyskiwane z systemów źródłowych i zapisywane w obszarze przejściowym, przekształcane na podstawie wymagań (zapewnienie jakości danych, deduplikowanie rekordów itd.), a następnie zapisywane w systemie docelowym, takim jak magazyn danych lub data lake.

Kroki rurociągu danych

Aby ułatwić rozpoczęcie tworzenia potoków danych w usłudze Azure Databricks, przykład zawarty w tym artykule zawiera instrukcje tworzenia przepływu pracy przetwarzania danych:

  • Używanie funkcji usługi Azure Databricks do eksplorowania nieprzetworzonego zestawu danych.
  • Utwórz notatnik Databricks do pozyskiwania nieprzetworzonych danych źródłowych i zapisywania ich w tabeli docelowej.
  • Utwórz notatnik usługi Databricks, aby przekształcić nieprzetworzone dane źródłowe i zapisać przekształcone dane w tabeli docelowej.
  • Utwórz notatnik usługi Databricks do zapytań dotyczących przekształconych danych.
  • Automatyzowanie potoku danych za pomocą zadania usługi Azure Databricks.

Wymagania

Przykład: zestaw danych z milionem piosenek

Zestaw danych używany w tym przykładzie jest podzbiorem zestawu danych Million Song Dataset, kolekcją funkcji i metadanych utworów muzyki współczesnej. Ten zestaw danych jest dostępny w przykładowych zestawach danych zawartych w obszarze roboczym usługi Azure Databricks.

Krok 1. Tworzenie zasobu obliczeniowego

Aby wykonać przetwarzanie i analizę danych w tym przykładzie, utwórz zasób obliczeniowy do uruchamiania poleceń.

Uwaga

Ponieważ w tym przykładzie jest używany przykładowy zestaw danych przechowywany w systemie plików DBFS i zaleca utrwalanie tabel w celu katalogu aparatu Unity, należy utworzyć zasób obliczeniowy skonfigurowany z dedykowanym trybem dostępu. Tryb dedykowanego dostępu zapewnia pełny dostęp do systemu plików DBFS, a także umożliwia dostęp do Unity Catalog. Zobacz Najlepsze praktyki dla DBFS i Unity Catalog.

  1. Kliknij Obliczenia na pasku bocznym.
  2. Na stronie Obliczenia kliknij Utwórz obliczenia.
  3. Na nowej stronie obliczeniowej wprowadź unikatową nazwę zasobu obliczeniowego.
  4. W obszarze Advancedprzełącz ustawienie trybu dostępu na Ręczne, a następnie wybierz tryb dedykowany.
  5. W użytkownik lub grupawybierz swoją nazwę użytkownika.
  6. Pozostaw pozostałe wartości w stanie domyślnym, a następnie kliknij pozycję Utwórz.

Aby dowiedzieć się więcej na temat zasobów obliczeniowych usługi Databricks, zobacz Compute.

Krok 2. Eksplorowanie danych źródłowych

Aby dowiedzieć się, jak używać interfejsu usługi Azure Databricks do eksplorowania surowych danych źródłowych, zobacz Eksplorowanie danych źródłowych dla potoku danych. Jeśli chcesz przejść bezpośrednio do pozyskiwania i przygotowywania danych, przejdź do kroku 3. Pozyskiwanie danych pierwotnych.

Krok 3. Pozyskiwanie danych pierwotnych

W tym kroku załadujesz nieprzetworzone dane do tabeli, aby udostępnić je do dalszego przetwarzania. Aby zarządzać zasobami danych na platformie Databricks, np. tabelami, Databricks zaleca Unity Catalog. Jeśli jednak nie masz uprawnień do tworzenia wymaganego katalogu i schematu do publikowania tabel w Unity Catalog, nadal można wykonać następujące kroki, publikując tabele w metastore Hive.

Aby pozyskiwać dane, usługa Databricks zaleca korzystanie z modułu automatycznego ładowania. Moduł ładujący automatycznie wykrywa i przetwarza nowe pliki, gdy docierają do magazynu obiektów w chmurze.

Automatyczne ładowanie można skonfigurować tak, aby automatycznie wykrywał schemat załadowanych danych, umożliwiając inicjowanie tabel bez jawnego deklarowania schematu danych i rozwijania schematu tabeli w miarę wprowadzania nowych kolumn. Eliminuje to konieczność ręcznego śledzenia i stosowania zmian schematu w czasie. Usługa Databricks zaleca wnioskowanie schematu podczas korzystania z Auto Loader. Jednak, jak widzimy w kroku eksploracji danych, dane utworów nie zawierają informacji w nagłówku. Ponieważ nagłówek nie jest przechowywany z danymi, należy jawnie zdefiniować schemat, jak pokazano w następnym przykładzie.

  1. Na pasku bocznym kliknij Nowa ikonaNowy i wybierz Notatnik z menu. Zostanie wyświetlone okno dialogowe Tworzenie notesu .

  2. Wprowadź nazwę notesu, na przykład Ingest songs data. Domyślnie:

  3. Wprowadź następujące informacje w pierwszej komórce notesu:

    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define variables used in the code below
    file_path = "/databricks-datasets/songs/data-001/"
    table_name = "<table-name>"
    checkpoint_path = "/tmp/pipeline_get_started/_checkpoint/song_data"
    
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    (spark.readStream
      .format("cloudFiles")
      .schema(schema)
      .option("cloudFiles.format", "csv")
      .option("sep","\t")
      .load(file_path)
      .writeStream
      .option("checkpointLocation", checkpoint_path)
      .trigger(availableNow=True)
      .toTable(table_name)
    )
    

    Jeśli używasz Unity Catalog, zastąp <table-name> katalogiem, schematem i nazwą tabeli, aby zawierały pozyskane rekordy (na przykład data_pipelines.songs_data.raw_song_data). W przeciwnym razie zastąp <table-name> nazwą tabeli, która będzie zawierać pozyskane rekordy, na przykład raw_song_data.

    Zastąp <checkpoint-path> ścieżką do katalogu w systemie plików DBFS, aby zachować pliki punktów kontrolnych, na przykład /tmp/pipeline_get_started/_checkpoint/song_data.

  4. Kliknij pozycję Menu Uruchamiania, a następnie wybierz pozycję Uruchom komórkę. W tym przykładzie zdefiniowano schemat danych przy użyciu informacji z README, pozyskiwane są dane utworów ze wszystkich plików zawartych w file_path, a dane są zapisywane w tabeli określonej przez table_name.

Krok 4. Przygotowywanie danych pierwotnych

Aby przygotować nieprzetworzone dane do analizy, poniższe kroki przekształcają nieprzetworzone dane utworów przez odfiltrowanie niepotrzebnych kolumn i dodanie nowego pola zawierającego znacznik czasu tworzenia nowego rekordu.

  1. Na pasku bocznym kliknij pozycję Nowa ikonaNowy i wybierz pozycję Notatnik z menu. Zostanie wyświetlone okno dialogowe Tworzenie notesu .

  2. Wprowadź nazwę notesu. Na przykład Prepare songs data. Zmień język domyślny na SQL.

  3. Wprowadź następujące informacje w pierwszej komórce notesu:

    CREATE OR REPLACE TABLE
      <table-name> (
        artist_id STRING,
        artist_name STRING,
        duration DOUBLE,
        release STRING,
        tempo DOUBLE,
        time_signature DOUBLE,
        title STRING,
        year DOUBLE,
        processed_time TIMESTAMP
      );
    
    INSERT INTO
      <table-name>
    SELECT
      artist_id,
      artist_name,
      duration,
      release,
      tempo,
      time_signature,
      title,
      year,
      current_timestamp()
    FROM
      <raw-songs-table-name>
    

    Jeśli używasz Unity Catalog, zastąp <table-name> katalogiem, schematem i nazwą tabeli, aby zawierały przefiltrowane i przekształcone rekordy (na przykład data_pipelines.songs_data.prepared_song_data). W przeciwnym razie zastąp <table-name> ciąg nazwą tabeli zawierającą przefiltrowane i przekształcone rekordy (na przykład prepared_song_data).

    Zastąp <raw-songs-table-name> nazwą tabeli zawierającej nieprzetworzone rekordy utworów pozyskane w poprzednim kroku.

  4. Kliknij Menu Uruchamiania, a następnie wybierz Uruchom komórkę.

Krok 5. Wykonywanie zapytań dotyczących przekształconych danych

W tym kroku rozszerzysz potok przetwarzania, dodając zapytania do analizowania danych utworów. Te zapytania używają przygotowanych rekordów utworzonych w poprzednim kroku.

  1. Na pasku bocznym kliknij Nowa ikonaNowy i wybierz Notatnik z menu. Zostanie wyświetlone okno dialogowe Tworzenie notesu .

  2. Wprowadź nazwę notesu. Na przykład Analyze songs data. Zmień język domyślny na SQL.

  3. Wprowadź następujące informacje w pierwszej komórce notesu:

    -- Which artists released the most songs each year?
    SELECT
      artist_name,
      count(artist_name)
    AS
      num_songs,
      year
    FROM
      <prepared-songs-table-name>
    WHERE
      year > 0
    GROUP BY
      artist_name,
      year
    ORDER BY
      num_songs DESC,
      year DESC
    

    Zastąp <prepared-songs-table-name> nazwą tabeli zawierającej przygotowane dane. Na przykład data_pipelines.songs_data.prepared_song_data.

  4. Kliknij strzałkę w dół w menu akcji komórki, wybierz Dodaj komórkę poniżej i wprowadź następującą treść w nowej komórce:

     -- Find songs for your DJ list
     SELECT
       artist_name,
       title,
       tempo
     FROM
       <prepared-songs-table-name>
     WHERE
       time_signature = 4
       AND
       tempo between 100 and 140;
    

    Zastąp <prepared-songs-table-name> nazwą przygotowanej tabeli utworzonej w poprzednim kroku. Na przykład data_pipelines.songs_data.prepared_song_data.

  5. Aby uruchomić zapytania i wyświetlić dane wyjściowe, kliknij pozycję Uruchom wszystko.

Krok 6: Utwórz zadanie Azure Databricks do uruchomienia potoku

Możesz utworzyć przepływ pracy, aby zautomatyzować uruchamianie kroków pozyskiwania, przetwarzania i analizy danych przy użyciu zadania usługi Azure Databricks.

  1. W obszarze roboczym Nauka o danych i inżynierii wykonaj jedną z następujących czynności:
    • Kliknij Ikona Przepływów PracyPrzepływy Pracy na pasku bocznym i kliknij Przycisk Utwórz zadanie.
    • Na pasku bocznym kliknij pozycję Nowa ikonaNowy i wybierz pozycję Zadanie.
  2. W oknie dialogowym zadania na karcie Zadania zastąp ciąg Dodaj nazwę zadania... nazwą zadania. Na przykład "Proces utworów".
  3. W polu Nazwa zadania wprowadź nazwę pierwszego zadania, na przykład Ingest_songs_data.
  4. W Typ wybierz typ zadania Notebook.
  5. W obszarze Źródło wybierz pozycję Obszar roboczy.
  6. W polu Ścieżka użyj przeglądarki plików, aby znaleźć notatnik do pobierania danych, a następnie kliknij Potwierdź.
  7. W computewybierz zasób obliczeniowy utworzony w kroku Create a compute resource.
  8. Kliknij pozycję Utwórz.
  9. Kliknij Przycisk Dodaj zadanie poniżej właśnie utworzonego zadania i wybierz Notatnik.
  10. W polu Nazwa zadania wprowadź nazwę zadania, na przykład Prepare_songs_data.
  11. W Typie wybierz typ zadania Notebook.
  12. W obszarze Źródło wybierz pozycję Obszar roboczy.
  13. Użyj przeglądarki plików, aby znaleźć notes przygotowywania danych, kliknij nazwę notesu, a następnie kliknij przycisk Potwierdź.
  14. W computewybierz zasób obliczeniowy utworzony w kroku Create a compute resource.
  15. Kliknij pozycję Utwórz.
  16. Kliknij Przycisk Dodaj zadanie poniżej utworzonego zadania i wybierz Notatnik.
  17. W polu Nazwa zadania wprowadź nazwę zadania, na przykład Analyze_songs_data.
  18. W Typie wybierz typ zadania Notebook.
  19. W obszarze Źródło wybierz pozycję Obszar roboczy.
  20. Użyj przeglądarki plików, aby znaleźć notatnik analizy danych, kliknij nazwę notatnika, a następnie kliknij Potwierdź.
  21. W computewybierz zasób obliczeniowy utworzony w kroku Create a compute resource.
  22. Kliknij pozycję Utwórz.
  23. Aby uruchomić przepływ pracy, kliknij pozycję Przycisk Uruchom teraz. Aby wyświetlić szczegóły przebiegu, kliknij link w kolumnie Godzina rozpoczęcia przebiegu w widoku przebiegów zadania. Kliknij każde zadanie, aby wyświetlić szczegóły przebiegu zadania.
  24. Aby wyświetlić wyniki po zakończeniu przepływu pracy, kliknij ostateczne zadanie analizy danych. Zostanie wyświetlona strona Dane wyjściowe i wyświetli wyniki zapytania.

Krok 7. Planowanie zadania potoku danych

Uwaga

Aby zademonstrować użycie zadania usługi Azure Databricks do orkiestracji zaplanowanego przepływu pracy, ten przykład wprowadzający oddziela etapy pozyskiwania, przygotowywania i analizy na oddzielnych notatnikach, a każdy notatnik jest następnie używany do utworzenia zadania w ramach pracy. Jeśli wszystkie operacje przetwarzania znajdują się w jednym notesie, możesz łatwo zaplanować notes bezpośrednio z poziomu interfejsu użytkownika notesu usługi Azure Databricks. Zobacz Tworzenie zaplanowanych zadań notesu i zarządzanie nimi.

Typowym wymaganiem jest uruchomienie rury danych według harmonogramu. Aby zdefiniować harmonogram zadania, w ramach którego jest uruchamiany potok:

  1. Kliknij pozycję Ikona przepływów pracyPrzepływy pracy na pasku bocznym.
  2. W kolumnie Nazwa kliknij nazwę zadania. Na panelu bocznym są wyświetlane szczegóły zadania.
  3. Kliknij pozycję Dodaj wyzwalacz na panelu Szczegóły zadania i wybierz pozycję Zaplanowane w polu Typ wyzwalacza.
  4. Określ okres, czas rozpoczęcia i strefę czasową. Opcjonalnie zaznacz pole wyboru Pokaż składnię Cron, aby wyświetlić i edytować harmonogram w składni Quartz Cron.
  5. Kliknij przycisk Zapisz.

Dowiedz się więcej