Tworzenie kompleksowego potoku danych w usłudze Databricks
W tym artykule przedstawiono, jak stworzyć i wdrożyć kompleksowy potok przetwarzania danych, w tym jak pozyskiwać dane surowe, przekształcać dane i przeprowadzać analizy na przetworzonych danych.
Uwaga
Mimo że w tym artykule pokazano, jak utworzyć kompletny potok danych przy użyciu notatników usługi Databricks i zadań usługi Azure Databricks do organizowania przepływu pracy, usługa Databricks zaleca używanie DLT, interfejsu deklaratywnego do tworzenia niezawodnych, łatwych w utrzymaniu i testowalnych potoków przetwarzania danych.
Co to jest potok danych?
Potok danych implementuje kroki wymagane do przenoszenia danych z systemów źródłowych, przekształcania tych danych na podstawie wymagań i przechowywania danych w systemie docelowym. Potok danych obejmuje wszystkie procesy niezbędne do przekształcenia danych pierwotnych w przygotowane dane, które użytkownicy mogą wykorzystywać. Na przykład potok danych może przygotować dane, aby analitycy danych i naukowcy danych mogli wyodrębnić wartość z danych poprzez analizę i raportowanie.
Przepływ pracy wyodrębniania, przekształcania i ładowania (ETL) jest typowym przykładem strumienia danych. W przypadku przetwarzania ETL dane są pozyskiwane z systemów źródłowych i zapisywane w obszarze przejściowym, przekształcane na podstawie wymagań (zapewnienie jakości danych, deduplikowanie rekordów itd.), a następnie zapisywane w systemie docelowym, takim jak magazyn danych lub data lake.
Kroki rurociągu danych
Aby ułatwić rozpoczęcie tworzenia potoków danych w usłudze Azure Databricks, przykład zawarty w tym artykule zawiera instrukcje tworzenia przepływu pracy przetwarzania danych:
- Używanie funkcji usługi Azure Databricks do eksplorowania nieprzetworzonego zestawu danych.
- Utwórz notatnik Databricks do pozyskiwania nieprzetworzonych danych źródłowych i zapisywania ich w tabeli docelowej.
- Utwórz notatnik usługi Databricks, aby przekształcić nieprzetworzone dane źródłowe i zapisać przekształcone dane w tabeli docelowej.
- Utwórz notatnik usługi Databricks do zapytań dotyczących przekształconych danych.
- Automatyzowanie potoku danych za pomocą zadania usługi Azure Databricks.
Wymagania
- Jesteś zalogowany do usługi Azure Databricks i znajdujesz się w obszarze roboczym Nauka o danych i Inżynieria.
- Masz uprawnienia do tworzenia zasobu obliczeniowego lub dostępu do zasobu obliczeniowego.
- (Opcjonalnie) Aby opublikować tabele do Unity Catalog, należy utworzyć katalog i schemat w Unity Catalog.
Przykład: zestaw danych z milionem piosenek
Zestaw danych używany w tym przykładzie jest podzbiorem zestawu danych Million Song Dataset, kolekcją funkcji i metadanych utworów muzyki współczesnej. Ten zestaw danych jest dostępny w przykładowych zestawach danych zawartych w obszarze roboczym usługi Azure Databricks.
Krok 1. Tworzenie zasobu obliczeniowego
Aby wykonać przetwarzanie i analizę danych w tym przykładzie, utwórz zasób obliczeniowy do uruchamiania poleceń.
Uwaga
Ponieważ w tym przykładzie jest używany przykładowy zestaw danych przechowywany w systemie plików DBFS i zaleca utrwalanie tabel w celu katalogu aparatu Unity, należy utworzyć zasób obliczeniowy skonfigurowany z dedykowanym trybem dostępu. Tryb dedykowanego dostępu zapewnia pełny dostęp do systemu plików DBFS, a także umożliwia dostęp do Unity Catalog. Zobacz Najlepsze praktyki dla DBFS i Unity Catalog.
- Kliknij Obliczenia na pasku bocznym.
- Na stronie Obliczenia kliknij Utwórz obliczenia.
- Na nowej stronie obliczeniowej wprowadź unikatową nazwę zasobu obliczeniowego.
- W obszarze Advancedprzełącz ustawienie trybu dostępu na Ręczne, a następnie wybierz tryb dedykowany.
- W użytkownik lub grupawybierz swoją nazwę użytkownika.
- Pozostaw pozostałe wartości w stanie domyślnym, a następnie kliknij pozycję Utwórz.
Aby dowiedzieć się więcej na temat zasobów obliczeniowych usługi Databricks, zobacz Compute.
Krok 2. Eksplorowanie danych źródłowych
Aby dowiedzieć się, jak używać interfejsu usługi Azure Databricks do eksplorowania surowych danych źródłowych, zobacz Eksplorowanie danych źródłowych dla potoku danych. Jeśli chcesz przejść bezpośrednio do pozyskiwania i przygotowywania danych, przejdź do kroku 3. Pozyskiwanie danych pierwotnych.
Krok 3. Pozyskiwanie danych pierwotnych
W tym kroku załadujesz nieprzetworzone dane do tabeli, aby udostępnić je do dalszego przetwarzania. Aby zarządzać zasobami danych na platformie Databricks, np. tabelami, Databricks zaleca Unity Catalog. Jeśli jednak nie masz uprawnień do tworzenia wymaganego katalogu i schematu do publikowania tabel w Unity Catalog, nadal można wykonać następujące kroki, publikując tabele w metastore Hive.
Aby pozyskiwać dane, usługa Databricks zaleca korzystanie z modułu automatycznego ładowania. Moduł ładujący automatycznie wykrywa i przetwarza nowe pliki, gdy docierają do magazynu obiektów w chmurze.
Automatyczne ładowanie można skonfigurować tak, aby automatycznie wykrywał schemat załadowanych danych, umożliwiając inicjowanie tabel bez jawnego deklarowania schematu danych i rozwijania schematu tabeli w miarę wprowadzania nowych kolumn. Eliminuje to konieczność ręcznego śledzenia i stosowania zmian schematu w czasie. Usługa Databricks zaleca wnioskowanie schematu podczas korzystania z Auto Loader. Jednak, jak widzimy w kroku eksploracji danych, dane utworów nie zawierają informacji w nagłówku. Ponieważ nagłówek nie jest przechowywany z danymi, należy jawnie zdefiniować schemat, jak pokazano w następnym przykładzie.
Na pasku bocznym kliknij
Nowy i wybierz Notatnik z menu. Zostanie wyświetlone okno dialogowe Tworzenie notesu .
Wprowadź nazwę notesu, na przykład
Ingest songs data
. Domyślnie:- Język Python jest wybranym językiem.
- Notatnik jest dołączony do ostatnio używanego zasobu obliczeniowego. W takim przypadku zasób utworzony w Krok 1: Tworzenie zasobu obliczeniowego.
Wprowadź następujące informacje w pierwszej komórce notesu:
from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField # Define variables used in the code below file_path = "/databricks-datasets/songs/data-001/" table_name = "<table-name>" checkpoint_path = "/tmp/pipeline_get_started/_checkpoint/song_data" schema = StructType( [ StructField("artist_id", StringType(), True), StructField("artist_lat", DoubleType(), True), StructField("artist_long", DoubleType(), True), StructField("artist_location", StringType(), True), StructField("artist_name", StringType(), True), StructField("duration", DoubleType(), True), StructField("end_of_fade_in", DoubleType(), True), StructField("key", IntegerType(), True), StructField("key_confidence", DoubleType(), True), StructField("loudness", DoubleType(), True), StructField("release", StringType(), True), StructField("song_hotnes", DoubleType(), True), StructField("song_id", StringType(), True), StructField("start_of_fade_out", DoubleType(), True), StructField("tempo", DoubleType(), True), StructField("time_signature", DoubleType(), True), StructField("time_signature_confidence", DoubleType(), True), StructField("title", StringType(), True), StructField("year", IntegerType(), True), StructField("partial_sequence", IntegerType(), True) ] ) (spark.readStream .format("cloudFiles") .schema(schema) .option("cloudFiles.format", "csv") .option("sep","\t") .load(file_path) .writeStream .option("checkpointLocation", checkpoint_path) .trigger(availableNow=True) .toTable(table_name) )
Jeśli używasz Unity Catalog, zastąp
<table-name>
katalogiem, schematem i nazwą tabeli, aby zawierały pozyskane rekordy (na przykładdata_pipelines.songs_data.raw_song_data
). W przeciwnym razie zastąp<table-name>
nazwą tabeli, która będzie zawierać pozyskane rekordy, na przykładraw_song_data
.Zastąp
<checkpoint-path>
ścieżką do katalogu w systemie plików DBFS, aby zachować pliki punktów kontrolnych, na przykład/tmp/pipeline_get_started/_checkpoint/song_data
.Kliknij pozycję
, a następnie wybierz pozycję Uruchom komórkę. W tym przykładzie zdefiniowano schemat danych przy użyciu informacji z
README
, pozyskiwane są dane utworów ze wszystkich plików zawartych wfile_path
, a dane są zapisywane w tabeli określonej przeztable_name
.
Krok 4. Przygotowywanie danych pierwotnych
Aby przygotować nieprzetworzone dane do analizy, poniższe kroki przekształcają nieprzetworzone dane utworów przez odfiltrowanie niepotrzebnych kolumn i dodanie nowego pola zawierającego znacznik czasu tworzenia nowego rekordu.
Na pasku bocznym kliknij pozycję
Nowy i wybierz pozycję Notatnik z menu. Zostanie wyświetlone okno dialogowe Tworzenie notesu .
Wprowadź nazwę notesu. Na przykład
Prepare songs data
. Zmień język domyślny na SQL.Wprowadź następujące informacje w pierwszej komórce notesu:
CREATE OR REPLACE TABLE <table-name> ( artist_id STRING, artist_name STRING, duration DOUBLE, release STRING, tempo DOUBLE, time_signature DOUBLE, title STRING, year DOUBLE, processed_time TIMESTAMP ); INSERT INTO <table-name> SELECT artist_id, artist_name, duration, release, tempo, time_signature, title, year, current_timestamp() FROM <raw-songs-table-name>
Jeśli używasz Unity Catalog, zastąp
<table-name>
katalogiem, schematem i nazwą tabeli, aby zawierały przefiltrowane i przekształcone rekordy (na przykładdata_pipelines.songs_data.prepared_song_data
). W przeciwnym razie zastąp<table-name>
ciąg nazwą tabeli zawierającą przefiltrowane i przekształcone rekordy (na przykładprepared_song_data
).Zastąp
<raw-songs-table-name>
nazwą tabeli zawierającej nieprzetworzone rekordy utworów pozyskane w poprzednim kroku.Kliknij
, a następnie wybierz Uruchom komórkę.
Krok 5. Wykonywanie zapytań dotyczących przekształconych danych
W tym kroku rozszerzysz potok przetwarzania, dodając zapytania do analizowania danych utworów. Te zapytania używają przygotowanych rekordów utworzonych w poprzednim kroku.
Na pasku bocznym kliknij
Nowy i wybierz Notatnik z menu. Zostanie wyświetlone okno dialogowe Tworzenie notesu .
Wprowadź nazwę notesu. Na przykład
Analyze songs data
. Zmień język domyślny na SQL.Wprowadź następujące informacje w pierwszej komórce notesu:
-- Which artists released the most songs each year? SELECT artist_name, count(artist_name) AS num_songs, year FROM <prepared-songs-table-name> WHERE year > 0 GROUP BY artist_name, year ORDER BY num_songs DESC, year DESC
Zastąp
<prepared-songs-table-name>
nazwą tabeli zawierającej przygotowane dane. Na przykładdata_pipelines.songs_data.prepared_song_data
.Kliknij
w menu akcji komórki, wybierz Dodaj komórkę poniżej i wprowadź następującą treść w nowej komórce:
-- Find songs for your DJ list SELECT artist_name, title, tempo FROM <prepared-songs-table-name> WHERE time_signature = 4 AND tempo between 100 and 140;
Zastąp
<prepared-songs-table-name>
nazwą przygotowanej tabeli utworzonej w poprzednim kroku. Na przykładdata_pipelines.songs_data.prepared_song_data
.Aby uruchomić zapytania i wyświetlić dane wyjściowe, kliknij pozycję Uruchom wszystko.
Krok 6: Utwórz zadanie Azure Databricks do uruchomienia potoku
Możesz utworzyć przepływ pracy, aby zautomatyzować uruchamianie kroków pozyskiwania, przetwarzania i analizy danych przy użyciu zadania usługi Azure Databricks.
- W obszarze roboczym Nauka o danych i inżynierii wykonaj jedną z następujących czynności:
- Kliknij
Przepływy Pracy na pasku bocznym i kliknij
.
- Na pasku bocznym kliknij pozycję
Nowy i wybierz pozycję Zadanie.
- Kliknij
- W oknie dialogowym zadania na karcie Zadania zastąp ciąg Dodaj nazwę zadania... nazwą zadania. Na przykład "Proces utworów".
- W polu Nazwa zadania wprowadź nazwę pierwszego zadania, na przykład
Ingest_songs_data
. - W Typ wybierz typ zadania Notebook.
- W obszarze Źródło wybierz pozycję Obszar roboczy.
- W polu Ścieżka użyj przeglądarki plików, aby znaleźć notatnik do pobierania danych, a następnie kliknij Potwierdź.
- W computewybierz zasób obliczeniowy utworzony w kroku
Create a compute resource
. - Kliknij pozycję Utwórz.
- Kliknij
poniżej właśnie utworzonego zadania i wybierz Notatnik.
- W polu Nazwa zadania wprowadź nazwę zadania, na przykład
Prepare_songs_data
. - W Typie wybierz typ zadania Notebook.
- W obszarze Źródło wybierz pozycję Obszar roboczy.
- Użyj przeglądarki plików, aby znaleźć notes przygotowywania danych, kliknij nazwę notesu, a następnie kliknij przycisk Potwierdź.
- W computewybierz zasób obliczeniowy utworzony w kroku
Create a compute resource
. - Kliknij pozycję Utwórz.
- Kliknij
poniżej utworzonego zadania i wybierz Notatnik.
- W polu Nazwa zadania wprowadź nazwę zadania, na przykład
Analyze_songs_data
. - W Typie wybierz typ zadania Notebook.
- W obszarze Źródło wybierz pozycję Obszar roboczy.
- Użyj przeglądarki plików, aby znaleźć notatnik analizy danych, kliknij nazwę notatnika, a następnie kliknij Potwierdź.
- W computewybierz zasób obliczeniowy utworzony w kroku
Create a compute resource
. - Kliknij pozycję Utwórz.
- Aby uruchomić przepływ pracy, kliknij pozycję
. Aby wyświetlić szczegóły przebiegu, kliknij link w kolumnie Godzina rozpoczęcia przebiegu w widoku przebiegów zadania. Kliknij każde zadanie, aby wyświetlić szczegóły przebiegu zadania.
- Aby wyświetlić wyniki po zakończeniu przepływu pracy, kliknij ostateczne zadanie analizy danych. Zostanie wyświetlona strona Dane wyjściowe i wyświetli wyniki zapytania.
Krok 7. Planowanie zadania potoku danych
Uwaga
Aby zademonstrować użycie zadania usługi Azure Databricks do orkiestracji zaplanowanego przepływu pracy, ten przykład wprowadzający oddziela etapy pozyskiwania, przygotowywania i analizy na oddzielnych notatnikach, a każdy notatnik jest następnie używany do utworzenia zadania w ramach pracy. Jeśli wszystkie operacje przetwarzania znajdują się w jednym notesie, możesz łatwo zaplanować notes bezpośrednio z poziomu interfejsu użytkownika notesu usługi Azure Databricks. Zobacz Tworzenie zaplanowanych zadań notesu i zarządzanie nimi.
Typowym wymaganiem jest uruchomienie rury danych według harmonogramu. Aby zdefiniować harmonogram zadania, w ramach którego jest uruchamiany potok:
- Kliknij pozycję
Przepływy pracy na pasku bocznym.
- W kolumnie Nazwa kliknij nazwę zadania. Na panelu bocznym są wyświetlane szczegóły zadania.
- Kliknij pozycję Dodaj wyzwalacz na panelu Szczegóły zadania i wybierz pozycję Zaplanowane w polu Typ wyzwalacza.
- Określ okres, czas rozpoczęcia i strefę czasową. Opcjonalnie zaznacz pole wyboru Pokaż składnię Cron, aby wyświetlić i edytować harmonogram w składni Quartz Cron.
- Kliknij przycisk Zapisz.
Dowiedz się więcej
- Aby dowiedzieć się więcej na temat notesów usługi Databricks, zobacz Wprowadzenie do notesów usługi Databricks.
- Aby dowiedzieć się więcej o zadaniach usługi Azure Databricks, zobacz Co to są zadania?.
- Aby dowiedzieć się więcej o usłudze Delta Lake, zobacz Co to jest usługa Delta Lake?.
- Aby dowiedzieć się więcej na temat potoków przetwarzania danych za pomocą biblioteki DLT, zobacz Co to jest DLT?.