Vytvoření kompletního datového kanálu v Databricks
V tomto článku se dozvíte, jak vytvořit a nasadit komplexní kanál zpracování dat, včetně toho, jak ingestovat nezpracovaná data, transformovat data a spouštět analýzy na zpracovaných datech.
Poznámka:
Přestože tento článek ukazuje, jak vytvořit úplný datový kanál pomocí poznámkových bloků Databricks a úlohy Azure Databricks pro orchestraci pracovního postupu, Databricks doporučuje používat delta živé tabulky, deklarativní rozhraní pro vytváření spolehlivých, udržovatelných a testovatelných kanálů zpracování dat.
Co je datový kanál?
Datový kanál implementuje kroky potřebné k přesunu dat ze zdrojových systémů, transformaci dat na základě požadavků a uložení dat do cílového systému. Datový kanál zahrnuje všechny procesy potřebné k převodu nezpracovaných dat na připravená data, která můžou uživatelé využívat. Datový kanál může například připravit data, aby datoví analytici a datoví vědci mohli extrahovat hodnotu z dat prostřednictvím analýzy a generování sestav.
Běžným příkladem datového kanálu je pracovní postup extrakce, transformace a načítání (ETL). Při zpracování ETL se data ingestují ze zdrojových systémů a zapisují se do pracovní oblasti, transformují se na základě požadavků (zajištění kvality dat, odstranění duplicitních dat atd.) a následně se zapisují do cílového systému, jako je datový sklad nebo datové jezero.
Kroky datového kanálu
Abyste mohli začít vytvářet datové kanály v Azure Databricks, příklad uvedený v tomto článku vás provede vytvořením pracovního postupu zpracování dat:
- Pomocí funkcí Azure Databricks můžete prozkoumat nezpracovanou datovou sadu.
- Vytvořte poznámkový blok Databricks pro příjem nezpracovaných zdrojových dat a zapište nezpracovaná data do cílové tabulky.
- Vytvořte poznámkový blok Databricks, který transformuje nezpracovaná zdrojová data a zapíše transformovaná data do cílové tabulky.
- Vytvořte poznámkový blok Databricks pro dotazování transformovaných dat.
- Automatizujte datový kanál pomocí úlohy Azure Databricks.
Požadavky
- Jste přihlášení k Azure Databricks a v pracovním prostoru Datová Věda & Engineering.
- Máte oprávnění k vytvoření clusteru nebo přístupu ke clusteru.
- (Volitelné) Pokud chcete publikovat tabulky do katalogu Unity, musíte vytvořit katalog a schéma v katalogu Unity.
Příklad: Milion datová sada Song
Datová sada použitá v tomto příkladu je podmnožinou sady million Song Dataset, kolekce funkcí a metadat pro současné hudební skladby. Tato datová sada je dostupná v ukázkových datových sadách zahrnutých v pracovním prostoru Azure Databricks.
Krok 1: Vytvoření clusteru
Pokud chcete provést zpracování a analýzu dat v tomto příkladu, vytvořte cluster, který poskytuje výpočetní prostředky potřebné ke spuštění příkazů.
Poznámka:
Vzhledem k tomu, že tento příklad používá ukázkovou datovou sadu uloženou v DBFS a doporučuje zachovat tabulky v katalogu Unity, vytvoříte cluster nakonfigurovaný s režimem přístupu jednoho uživatele. Režim přístupu jednoho uživatele poskytuje úplný přístup k DBFS a zároveň umožňuje přístup ke katalogu Unity. Podívejte se na osvědčené postupy pro DBFS a Katalog Unity.
- Na bočním panelu klikněte na Výpočty .
- Na stránce Compute klikněte na Vytvořit cluster.
- Na stránce Nový cluster zadejte jedinečný název clusteru.
- V režimu přístupu vyberte Jednoho uživatele.
- V přístupu k jednomu uživateli nebo instančnímu objektu vyberte své uživatelské jméno.
- Ponechte zbývající hodnoty ve výchozím stavu a klikněte na Vytvořit cluster.
Další informace o clusterech Databricks najdete v tématu Výpočty.
Krok 2: Prozkoumání zdrojových dat
Informace o použití rozhraní Azure Databricks k prozkoumání nezpracovaných zdrojových dat najdete v tématu Prozkoumání zdrojových dat datového kanálu. Pokud chcete přejít přímo na ingestování a přípravu dat, pokračujte krokem 3: Ingestování nezpracovaných dat.
Krok 3: Ingestování nezpracovaných dat
V tomto kroku načtete nezpracovaná data do tabulky, aby byla k dispozici pro další zpracování. Ke správě datových prostředků na platformě Databricks, jako jsou tabulky, doporučuje Databricks katalog Unity. Pokud ale nemáte oprávnění k vytvoření požadovaného katalogu a schématu pro publikování tabulek do katalogu Unity, můžete i nadále provádět následující kroky publikováním tabulek do metastoru Hive.
K ingestování dat doporučuje Databricks používat automatický zavaděč. Auto Loader automaticky rozpozná a zpracuje nové soubory při jejich doručení do cloudového úložiště objektů.
Automatický zavaděč můžete nakonfigurovat tak, aby automaticky rozpoznal schéma načtených dat, což umožňuje inicializovat tabulky bez explicitního deklarování schématu dat a vyvíjet schéma tabulky při zavádění nových sloupců. To eliminuje potřebu ručního sledování a použití změn schématu v průběhu času. Databricks doporučuje odvozování schématu při použití automatického zavaděče. Jak je ale vidět v kroku zkoumání dat, data skladeb neobsahují informace hlavičky. Protože záhlaví není uloženo s daty, budete muset explicitně definovat schéma, jak je znázorněno v dalším příkladu.
Na bočním panelu klikněte na Nový a v nabídce vyberte Poznámkový blok. Zobrazí se dialogové okno Vytvořit poznámkový blok .
Zadejte název poznámkového bloku,
Ingest songs data
například . Standardně:- Python je vybraný jazyk.
- Poznámkový blok je připojený k poslednímu použitému clusteru. V tomto případě cluster, který jste vytvořili v kroku 1: Vytvoření clusteru.
Do první buňky poznámkového bloku zadejte následující:
from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField # Define variables used in the code below file_path = "/databricks-datasets/songs/data-001/" table_name = "<table-name>" checkpoint_path = "/tmp/pipeline_get_started/_checkpoint/song_data" schema = StructType( [ StructField("artist_id", StringType(), True), StructField("artist_lat", DoubleType(), True), StructField("artist_long", DoubleType(), True), StructField("artist_location", StringType(), True), StructField("artist_name", StringType(), True), StructField("duration", DoubleType(), True), StructField("end_of_fade_in", DoubleType(), True), StructField("key", IntegerType(), True), StructField("key_confidence", DoubleType(), True), StructField("loudness", DoubleType(), True), StructField("release", StringType(), True), StructField("song_hotnes", DoubleType(), True), StructField("song_id", StringType(), True), StructField("start_of_fade_out", DoubleType(), True), StructField("tempo", DoubleType(), True), StructField("time_signature", DoubleType(), True), StructField("time_signature_confidence", DoubleType(), True), StructField("title", StringType(), True), StructField("year", IntegerType(), True), StructField("partial_sequence", IntegerType(), True) ] ) (spark.readStream .format("cloudFiles") .schema(schema) .option("cloudFiles.format", "csv") .option("sep","\t") .load(file_path) .writeStream .option("checkpointLocation", checkpoint_path) .trigger(availableNow=True) .toTable(table_name) )
Pokud používáte Katalog Unity, nahraďte
<table-name>
ho názvem katalogu, schématu a tabulky, který bude obsahovat ingestované záznamy (napříkladdata_pipelines.songs_data.raw_song_data
). V opačném případě nahraďte<table-name>
názvem tabulky, která bude obsahovat ingestované záznamy,raw_song_data
například .Nahraďte
<checkpoint-path>
cestou k adresáři v DBFS za účelem údržby souborů kontrolních bodů,/tmp/pipeline_get_started/_checkpoint/song_data
například .Klikněte na položku a vyberte Spustit buňku. Tento příklad definuje schéma dat pomocí informací z
README
, ingestuje skladby data ze všech souborů obsažených vfile_path
a zapíše data do tabulky určenétable_name
.
Krok 4: Příprava nezpracovaných dat
Pokud chcete připravit nezpracovaná data pro analýzu, následující kroky transformují nezpracovaná data skladeb vyfiltrováním nepotřebných sloupců a přidáním nového pole obsahujícího časové razítko pro vytvoření nového záznamu.
Na bočním panelu klikněte na Nový a v nabídce vyberte Poznámkový blok. Zobrazí se dialogové okno Vytvořit poznámkový blok .
Zadejte název poznámkového bloku. Například
Prepare songs data
. Změňte výchozí jazyk na SQL.Do první buňky poznámkového bloku zadejte následující:
CREATE OR REPLACE TABLE <table-name> ( artist_id STRING, artist_name STRING, duration DOUBLE, release STRING, tempo DOUBLE, time_signature DOUBLE, title STRING, year DOUBLE, processed_time TIMESTAMP ); INSERT INTO <table-name> SELECT artist_id, artist_name, duration, release, tempo, time_signature, title, year, current_timestamp() FROM <raw-songs-table-name>
Pokud používáte Katalog Unity, nahraďte
<table-name>
ho názvem katalogu, schématu a tabulky, který bude obsahovat filtrované a transformované záznamy (napříkladdata_pipelines.songs_data.prepared_song_data
). V opačném případě nahraďte<table-name>
názvem tabulky, která bude obsahovat filtrované a transformované záznamy (napříkladprepared_song_data
).Nahraďte
<raw-songs-table-name>
názvem tabulky, která obsahuje nezpracované záznamy skladeb přijatých v předchozím kroku.Klikněte na položku a vyberte Spustit buňku.
Krok 5: Dotazování transformovaných dat
V tomto kroku rozšíříte kanál zpracování přidáním dotazů pro analýzu dat skladeb. Tyto dotazy používají připravené záznamy vytvořené v předchozím kroku.
Na bočním panelu klikněte na Nový a v nabídce vyberte Poznámkový blok. Zobrazí se dialogové okno Vytvořit poznámkový blok .
Zadejte název poznámkového bloku. Například
Analyze songs data
. Změňte výchozí jazyk na SQL.Do první buňky poznámkového bloku zadejte následující:
-- Which artists released the most songs each year? SELECT artist_name, count(artist_name) AS num_songs, year FROM <prepared-songs-table-name> WHERE year > 0 GROUP BY artist_name, year ORDER BY num_songs DESC, year DESC
Nahraďte
<prepared-songs-table-name>
názvem tabulky obsahující připravená data. Napříkladdata_pipelines.songs_data.prepared_song_data
.Klikněte v nabídce akcí buňky, vyberte Přidat buňku pod a do nové buňky zadejte následující:
-- Find songs for your DJ list SELECT artist_name, title, tempo FROM <prepared-songs-table-name> WHERE time_signature = 4 AND tempo between 100 and 140;
Nahraďte
<prepared-songs-table-name>
názvem připravené tabulky vytvořené v předchozím kroku. Napříkladdata_pipelines.songs_data.prepared_song_data
.Pokud chcete spouštět dotazy a zobrazit výstup, klikněte na Spustit vše.
Krok 6: Vytvoření úlohy Azure Databricks pro spuštění kanálu
Pomocí úlohy Azure Databricks můžete vytvořit pracovní postup pro automatizaci spouštění kroků pro příjem, zpracování a analýzu dat.
- V pracovním prostoru Datová Věda & Engineering udělejte jednu z těchto věcí:
- Na bočním panelu klikněte na Pracovní postupy a klikněte na .
- Na bočním panelu klikněte na Nový a vyberte Úloha.
- V dialogovém okně úkolu na kartě Úkoly nahraďte možnost Přidat název úlohy... názvem vaší úlohy. Například "Pracovní postup Skladby".
- Do pole Název úkolu zadejte název prvního úkolu,
Ingest_songs_data
například . - V části Typ vyberte typ úlohy Poznámkový blok .
- Ve zdroji vyberte Pracovní prostor.
- Pomocí prohlížeče souborů vyhledejte poznámkový blok pro příjem dat, klikněte na název poznámkového bloku a klikněte na Potvrdit.
- V clusteru vyberte Shared_job_cluster nebo cluster, který jste vytvořili v
Create a cluster
kroku. - Klikněte na Vytvořit.
- Klikněte pod úkol, který jste právě vytvořili, a vyberte Poznámkový blok.
- Do pole Název úkolu zadejte název úkolu,
Prepare_songs_data
například . - V části Typ vyberte typ úlohy Poznámkový blok .
- Ve zdroji vyberte Pracovní prostor.
- V prohlížeči souborů vyhledejte poznámkový blok pro přípravu dat, klikněte na název poznámkového bloku a klikněte na Potvrdit.
- V clusteru vyberte Shared_job_cluster nebo cluster, který jste vytvořili v
Create a cluster
kroku. - Klikněte na Vytvořit.
- Klikněte pod úkol, který jste právě vytvořili, a vyberte Poznámkový blok.
- Do pole Název úkolu zadejte název úkolu,
Analyze_songs_data
například . - V části Typ vyberte typ úlohy Poznámkový blok .
- Ve zdroji vyberte Pracovní prostor.
- Pomocí prohlížeče souborů vyhledejte poznámkový blok analýzy dat, klikněte na název poznámkového bloku a klikněte na Potvrdit.
- V clusteru vyberte Shared_job_cluster nebo cluster, který jste vytvořili v
Create a cluster
kroku. - Klikněte na Vytvořit.
- Chcete-li spustit pracovní postup, klikněte na tlačítko . Pokud chcete zobrazit podrobnosti o spuštění, klikněte na odkaz ve sloupci Čas zahájení spuštění spuštění v zobrazení spuštění úlohy. Kliknutím na každou úlohu zobrazíte podrobnosti o spuštění úlohy.
- Pokud chcete zobrazit výsledky po dokončení pracovního postupu, klikněte na konečný úkol analýzy dat. Zobrazí se stránka Výstup a zobrazí výsledky dotazu.
Krok 7: Naplánování úlohy datového kanálu
Poznámka:
Abychom si ukázali použití úlohy Azure Databricks k orchestraci naplánovaného pracovního postupu, tento příklad začínáme odděluje kroky příjmu dat, přípravy a analýzy do samostatných poznámkových bloků a každý poznámkový blok se pak použije k vytvoření úkolu v úloze. Pokud je veškeré zpracování obsažené v jednom poznámkovém bloku, můžete poznámkový blok snadno naplánovat přímo z uživatelského rozhraní poznámkového bloku Azure Databricks. Viz Vytvoření a správa naplánovaných úloh poznámkového bloku.
Běžným požadavkem je naplánované spuštění datového kanálu. Definování plánu úlohy, která kanál spouští:
- Na bočním panelu klikněte na Pracovní postupy.
- Ve sloupci Název klikněte na název úlohy. Na bočním panelu se zobrazí podrobnosti o úloze.
- Na panelu Podrobnosti úlohy klikněte na Tlačítko Přidat aktivační událost a vyberte Typ triggeru Naplánovano.
- Zadejte období, počáteční čas a časové pásmo. Volitelně zaškrtněte políčko Zobrazit syntaxi Cron a zobrazte a upravte plán v syntaxi Quartz Cron.
- Klikněte na Uložit.
Další informace
- Další informace o poznámkových blocích Databricks najdete v tématu Úvod do poznámkových bloků Databricks.
- Další informace o úlohách Azure Databricks najdete v tématu Co jsou úlohy Databricks?
- Další informace o Delta Lake najdete v tématu Co je Delta Lake?
- Další informace o kanálech zpracování dat pomocí rozdílových živých tabulek najdete v tématu Co je Delta Live Tables?