Tworzenie kompleksowego potoku danych w usłudze Databricks
W tym artykule przedstawiono sposób tworzenia i wdrażania kompleksowego potoku przetwarzania danych, w tym sposobu pozyskiwania danych pierwotnych, przekształcania danych i uruchamiania analiz na przetworzonych danych.
Uwaga
Chociaż w tym artykule pokazano, jak utworzyć kompletny potok danych przy użyciu notesów usługi Databricks i zadania usługi Azure Databricks do organizowania przepływu pracy, usługa Databricks zaleca używanie tabel delta Live Tables, interfejsu deklaratywnego do tworzenia niezawodnych, konserwacyjnych i testowalnych potoków przetwarzania danych.
Co to jest potok danych?
Potok danych implementuje kroki wymagane do przenoszenia danych z systemów źródłowych, przekształcania tych danych na podstawie wymagań i przechowywania danych w systemie docelowym. Potok danych obejmuje wszystkie procesy niezbędne do przekształcenia danych pierwotnych w przygotowane dane, które użytkownicy mogą wykorzystywać. Na przykład potok danych może przygotować dane, aby analitycy danych i analitycy danych mogli wyodrębnić wartość z danych za pośrednictwem analizy i raportowania.
Przepływ pracy wyodrębniania, przekształcania i ładowania (ETL) jest typowym przykładem potoku danych. W przypadku przetwarzania ETL dane są pozyskiwane z systemów źródłowych i zapisywane w obszarze przejściowym, przekształcane na podstawie wymagań (zapewnienie jakości danych, deduplikowanie rekordów itd.), a następnie zapisywane w systemie docelowym, takim jak magazyn danych lub data lake.
Kroki potoku danych
Aby ułatwić rozpoczęcie tworzenia potoków danych w usłudze Azure Databricks, przykład zawarty w tym artykule zawiera instrukcje tworzenia przepływu pracy przetwarzania danych:
- Używanie funkcji usługi Azure Databricks do eksplorowania nieprzetworzonego zestawu danych.
- Utwórz notes usługi Databricks, aby pozyskiwać nieprzetworzone dane źródłowe i zapisywać nieprzetworzone dane w tabeli docelowej.
- Utwórz notes usługi Databricks, aby przekształcić nieprzetworzone dane źródłowe i zapisać przekształcone dane w tabeli docelowej.
- Utwórz notes usługi Databricks w celu wykonywania zapytań dotyczących przekształconych danych.
- Automatyzowanie potoku danych za pomocą zadania usługi Azure Databricks.
Wymagania
- Zalogowano się do usługi Azure Databricks i w obszarze roboczym Nauka o danych & Engineering.
- Masz uprawnienia do tworzenia klastra lub dostępu do klastra.
- (Opcjonalnie) Aby opublikować tabele w wykazie aparatu Unity, należy utworzyć wykaz i schemat w wykazie aparatu Unity.
Przykład: zestaw danych z milionem piosenek
Zestaw danych używany w tym przykładzie jest podzbiorem zestawu danych Million Song Dataset, kolekcją funkcji i metadanych utworów muzyki współczesnej. Ten zestaw danych jest dostępny w przykładowych zestawach danych zawartych w obszarze roboczym usługi Azure Databricks.
Krok 1. Tworzenie klastra
Aby wykonać przetwarzanie i analizę danych w tym przykładzie, utwórz klaster w celu udostępnienia zasobów obliczeniowych potrzebnych do uruchamiania poleceń.
Uwaga
Ponieważ w tym przykładzie jest używany przykładowy zestaw danych przechowywany w systemie plików DBFS i zaleca utrwalanie tabel w wykazie aparatu Unity, należy utworzyć klaster skonfigurowany z trybem dostępu pojedynczego użytkownika. Tryb dostępu pojedynczego użytkownika zapewnia pełny dostęp do systemu plików DBFS, a jednocześnie umożliwia dostęp do katalogu aparatu Unity. Zobacz Najlepsze rozwiązania dotyczące systemu plików DBFS i wykazu aparatu Unity.
- Kliknij pozycję Obliczenia na pasku bocznym.
- Na stronie Obliczenia kliknij pozycję Utwórz klaster.
- Na stronie Nowy klaster wprowadź unikatową nazwę klastra.
- W obszarze Tryb dostępu wybierz pozycję Pojedynczy użytkownik.
- W obszarze Dostęp do pojedynczego użytkownika lub jednostki usługi wybierz nazwę użytkownika.
- Pozostaw pozostałe wartości w stanie domyślnym, a następnie kliknij pozycję Utwórz klaster.
Aby dowiedzieć się więcej o klastrach usługi Databricks, zobacz Obliczenia.
Krok 2. Eksplorowanie danych źródłowych
Aby dowiedzieć się, jak używać interfejsu usługi Azure Databricks do eksplorowania nieprzetworzonych danych źródłowych, zobacz Eksplorowanie danych źródłowych dla potoku danych. Jeśli chcesz przejść bezpośrednio do pozyskiwania i przygotowywania danych, przejdź do kroku 3. Pozyskiwanie danych pierwotnych.
Krok 3. Pozyskiwanie danych pierwotnych
W tym kroku załadujesz nieprzetworzone dane do tabeli, aby udostępnić je do dalszego przetwarzania. Aby zarządzać zasobami danych na platformie databricks, takiej jak tabele, usługa Databricks zaleca katalog aparatu Unity. Jeśli jednak nie masz uprawnień do tworzenia wymaganego wykazu i schematu do publikowania tabel w wykazie aparatu Unity, nadal możesz wykonać następujące kroki, publikując tabele w magazynie metadanych Hive.
Aby pozyskiwać dane, usługa Databricks zaleca korzystanie z modułu automatycznego ładowania. Automatycznie moduł ładujący automatycznie wykrywa i przetwarza nowe pliki w miarę ich przybycia do magazynu obiektów w chmurze.
Automatyczne ładowanie można skonfigurować tak, aby automatycznie wykrywał schemat załadowanych danych, umożliwiając inicjowanie tabel bez jawnego deklarowania schematu danych i rozwijania schematu tabeli w miarę wprowadzania nowych kolumn. Eliminuje to konieczność ręcznego śledzenia i stosowania zmian schematu w czasie. Usługa Databricks zaleca wnioskowanie schematu podczas korzystania z automatycznego modułu ładującego. Jednak jak pokazano w kroku eksploracji danych, dane utworów nie zawierają informacji nagłówka. Ponieważ nagłówek nie jest przechowywany z danymi, należy jawnie zdefiniować schemat, jak pokazano w następnym przykładzie.
Na pasku bocznym kliknij pozycję Nowy i wybierz pozycję Notes z menu. Zostanie wyświetlone okno dialogowe Tworzenie notesu .
Wprowadź nazwę notesu, na przykład
Ingest songs data
. Domyślnie:- Język Python jest wybranym językiem.
- Notes jest dołączony do ostatniego użytego klastra. W takim przypadku klaster utworzony w kroku 1: Tworzenie klastra.
Wprowadź następujące informacje w pierwszej komórce notesu:
from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField # Define variables used in the code below file_path = "/databricks-datasets/songs/data-001/" table_name = "<table-name>" checkpoint_path = "/tmp/pipeline_get_started/_checkpoint/song_data" schema = StructType( [ StructField("artist_id", StringType(), True), StructField("artist_lat", DoubleType(), True), StructField("artist_long", DoubleType(), True), StructField("artist_location", StringType(), True), StructField("artist_name", StringType(), True), StructField("duration", DoubleType(), True), StructField("end_of_fade_in", DoubleType(), True), StructField("key", IntegerType(), True), StructField("key_confidence", DoubleType(), True), StructField("loudness", DoubleType(), True), StructField("release", StringType(), True), StructField("song_hotnes", DoubleType(), True), StructField("song_id", StringType(), True), StructField("start_of_fade_out", DoubleType(), True), StructField("tempo", DoubleType(), True), StructField("time_signature", DoubleType(), True), StructField("time_signature_confidence", DoubleType(), True), StructField("title", StringType(), True), StructField("year", IntegerType(), True), StructField("partial_sequence", IntegerType(), True) ] ) (spark.readStream .format("cloudFiles") .schema(schema) .option("cloudFiles.format", "csv") .option("sep","\t") .load(file_path) .writeStream .option("checkpointLocation", checkpoint_path) .trigger(availableNow=True) .toTable(table_name) )
Jeśli używasz wykazu aparatu Unity, zastąp
<table-name>
ciąg katalogiem, schematem i nazwą tabeli, aby zawierać pozyskane rekordy (na przykładdata_pipelines.songs_data.raw_song_data
). W przeciwnym razie zastąp<table-name>
ciąg nazwą tabeli, aby zawierała pozyskane rekordy, na przykładraw_song_data
.Zastąp
<checkpoint-path>
ciąg ścieżką do katalogu w systemie plików DBFS, aby zachować pliki punktów kontrolnych, na przykład/tmp/pipeline_get_started/_checkpoint/song_data
.Kliknij pozycję , a następnie wybierz pozycję Uruchom komórkę. W tym przykładzie zdefiniowano schemat danych przy użyciu informacji z
README
elementu , pozyskuje dane utworów ze wszystkich plików zawartych wfile_path
pliku i zapisuje dane w tabeli określonej przeztable_name
element .
Krok 4. Przygotowywanie danych pierwotnych
Aby przygotować nieprzetworzone dane do analizy, poniższe kroki przekształcają nieprzetworzone dane utworów przez odfiltrowanie niepotrzebnych kolumn i dodanie nowego pola zawierającego znacznik czasu tworzenia nowego rekordu.
Na pasku bocznym kliknij pozycję Nowy i wybierz pozycję Notes z menu. Zostanie wyświetlone okno dialogowe Tworzenie notesu .
Wprowadź nazwę notesu. Na przykład
Prepare songs data
. Zmień język domyślny na SQL.Wprowadź następujące informacje w pierwszej komórce notesu:
CREATE OR REPLACE TABLE <table-name> ( artist_id STRING, artist_name STRING, duration DOUBLE, release STRING, tempo DOUBLE, time_signature DOUBLE, title STRING, year DOUBLE, processed_time TIMESTAMP ); INSERT INTO <table-name> SELECT artist_id, artist_name, duration, release, tempo, time_signature, title, year, current_timestamp() FROM <raw-songs-table-name>
Jeśli używasz wykazu aparatu Unity, zastąp
<table-name>
ciąg katalogiem, schematem i nazwą tabeli, aby zawierać przefiltrowane i przekształcone rekordy (na przykładdata_pipelines.songs_data.prepared_song_data
). W przeciwnym razie zastąp<table-name>
ciąg nazwą tabeli zawierającą przefiltrowane i przekształcone rekordy (na przykładprepared_song_data
).Zastąp
<raw-songs-table-name>
ciąg nazwą tabeli zawierającej nieprzetworzone rekordy utworów pozyskane w poprzednim kroku.Kliknij pozycję , a następnie wybierz pozycję Uruchom komórkę.
Krok 5. Wykonywanie zapytań dotyczących przekształconych danych
W tym kroku rozszerzysz potok przetwarzania, dodając zapytania do analizowania danych utworów. Te zapytania używają przygotowanych rekordów utworzonych w poprzednim kroku.
Na pasku bocznym kliknij pozycję Nowy i wybierz pozycję Notes z menu. Zostanie wyświetlone okno dialogowe Tworzenie notesu .
Wprowadź nazwę notesu. Na przykład
Analyze songs data
. Zmień język domyślny na SQL.Wprowadź następujące informacje w pierwszej komórce notesu:
-- Which artists released the most songs each year? SELECT artist_name, count(artist_name) AS num_songs, year FROM <prepared-songs-table-name> WHERE year > 0 GROUP BY artist_name, year ORDER BY num_songs DESC, year DESC
Zastąp
<prepared-songs-table-name>
ciąg nazwą tabeli zawierającej przygotowane dane. Na przykładdata_pipelines.songs_data.prepared_song_data
.Kliknij menu akcji komórki, wybierz pozycję Dodaj komórkę poniżej i wprowadź następujące polecenie w nowej komórce:
-- Find songs for your DJ list SELECT artist_name, title, tempo FROM <prepared-songs-table-name> WHERE time_signature = 4 AND tempo between 100 and 140;
Zastąp
<prepared-songs-table-name>
ciąg nazwą przygotowanej tabeli utworzonej w poprzednim kroku. Na przykładdata_pipelines.songs_data.prepared_song_data
.Aby uruchomić zapytania i wyświetlić dane wyjściowe, kliknij pozycję Uruchom wszystko.
Krok 6. Tworzenie zadania usługi Azure Databricks w celu uruchomienia potoku
Możesz utworzyć przepływ pracy, aby zautomatyzować uruchamianie kroków pozyskiwania, przetwarzania i analizy danych przy użyciu zadania usługi Azure Databricks.
- W obszarze roboczym Nauka o danych i inżynierii wykonaj jedną z następujących czynności:
- Kliknij pozycję Przepływy pracy na pasku bocznym i kliknij pozycję .
- Na pasku bocznym kliknij pozycję Nowy i wybierz pozycję Zadanie.
- W oknie dialogowym zadania na karcie Zadania zastąp ciąg Dodaj nazwę zadania... nazwą zadania. Na przykład "Przepływ pracy utworów".
- W polu Nazwa zadania wprowadź nazwę pierwszego zadania, na przykład
Ingest_songs_data
. - W polu Typ wybierz typ zadania Notes .
- W obszarze Źródło wybierz pozycję Obszar roboczy.
- Użyj przeglądarki plików, aby znaleźć notes pozyskiwania danych, kliknij nazwę notesu, a następnie kliknij przycisk Potwierdź.
- W obszarze Klaster wybierz pozycję Shared_job_cluster lub klaster utworzony w
Create a cluster
kroku. - Kliknij pozycję Utwórz.
- Kliknij poniżej właśnie utworzonego zadania i wybierz pozycję Notes.
- W polu Nazwa zadania wprowadź nazwę zadania, na przykład
Prepare_songs_data
. - W polu Typ wybierz typ zadania Notes .
- W obszarze Źródło wybierz pozycję Obszar roboczy.
- Użyj przeglądarki plików, aby znaleźć notes przygotowywania danych, kliknij nazwę notesu, a następnie kliknij przycisk Potwierdź.
- W obszarze Klaster wybierz pozycję Shared_job_cluster lub klaster utworzony w
Create a cluster
kroku. - Kliknij pozycję Utwórz.
- Kliknij poniżej właśnie utworzonego zadania i wybierz pozycję Notes.
- W polu Nazwa zadania wprowadź nazwę zadania, na przykład
Analyze_songs_data
. - W polu Typ wybierz typ zadania Notes .
- W obszarze Źródło wybierz pozycję Obszar roboczy.
- Użyj przeglądarki plików, aby znaleźć notes analizy danych, kliknij nazwę notesu, a następnie kliknij przycisk Potwierdź.
- W obszarze Klaster wybierz pozycję Shared_job_cluster lub klaster utworzony w
Create a cluster
kroku. - Kliknij pozycję Utwórz.
- Aby uruchomić przepływ pracy, kliknij pozycję . Aby wyświetlić szczegóły przebiegu, kliknij link w kolumnie Godzina rozpoczęcia przebiegu w widoku przebiegów zadania. Kliknij każde zadanie, aby wyświetlić szczegóły przebiegu zadania.
- Aby wyświetlić wyniki po zakończeniu przepływu pracy, kliknij ostateczne zadanie analizy danych. Zostanie wyświetlona strona Dane wyjściowe i wyświetli wyniki zapytania.
Krok 7. Planowanie zadania potoku danych
Uwaga
Aby zademonstrować użycie zadania usługi Azure Databricks do organizowania zaplanowanego przepływu pracy, ten przykład wprowadzający oddziela etapy pozyskiwania, przygotowywania i analizy w oddzielnych notesach, a każdy notes jest następnie używany do tworzenia zadania w zadaniu. Jeśli wszystkie operacje przetwarzania znajdują się w jednym notesie, możesz łatwo zaplanować notes bezpośrednio z poziomu interfejsu użytkownika notesu usługi Azure Databricks. Zobacz Tworzenie zaplanowanych zadań notesu i zarządzanie nimi.
Typowym wymaganiem jest uruchomienie potoku danych zgodnie z harmonogramem. Aby zdefiniować harmonogram zadania, w ramach którego jest uruchamiany potok:
- Kliknij pozycję Przepływy pracy na pasku bocznym.
- W kolumnie Nazwa kliknij nazwę zadania. Na panelu bocznym są wyświetlane szczegóły zadania.
- Kliknij pozycję Dodaj wyzwalacz na panelu Szczegóły zadania i wybierz pozycję Zaplanowane w polu Typ wyzwalacza.
- Określ okres, czas rozpoczęcia i strefę czasową. Opcjonalnie zaznacz pole wyboru Pokaż składnię Cron, aby wyświetlić i edytować harmonogram w składni Kron kwarcu.
- Kliknij przycisk Zapisz.
Dowiedz się więcej
- Aby dowiedzieć się więcej na temat notesów usługi Databricks, zobacz Wprowadzenie do notesów usługi Databricks.
- Aby dowiedzieć się więcej o zadaniach usługi Azure Databricks, zobacz Co to są zadania usługi Databricks?.
- Aby dowiedzieć się więcej o usłudze Delta Lake, zobacz Co to jest usługa Delta Lake?.
- Aby dowiedzieć się więcej na temat potoków przetwarzania danych za pomocą tabel delta Live Tables, zobacz Co to jest delta live tables?.