Kurz: Spuštění kompletního analytického kanálu Lakehouse
V tomto kurzu se dozvíte, jak nastavit komplexní analytický kanál pro Azure Databricks Lakehouse.
Důležité
Tento kurz používá interaktivní poznámkové bloky k dokončení běžných úloh ETL v Pythonu v clusterech s podporou katalogu Unity. Pokud nepoužíváte katalog Unity, přečtěte si téma Spuštění první úlohy ETL v Azure Databricks.
Úkoly v tomto kurzu
Na konci tohoto článku se budete cítit pohodlně:
- Spuštění výpočetního clusteru s povoleným katalogem Unity
- Vytvoření poznámkového bloku Databricks
- Zápis a čtení dat z externího umístění katalogu Unity
- Konfigurace přírůstkového příjmu dat do tabulky katalogu Unity pomocí automatického zavaděče
- Spouštění buněk poznámkového bloku pro zpracování, dotazování a náhled dat
- Plánování poznámkového bloku jako úlohy Databricks
- Dotazování tabulek katalogu Unity z Databricks SQL
Azure Databricks poskytuje sadu nástrojů připravených pro produkční prostředí, které odborníkům na data umožňují rychle vyvíjet a nasazovat kanály extrakce, transformace a načítání (ETL). Katalog Unity umožňuje správci dat konfigurovat a zabezpečit přihlašovací údaje úložiště, externí umístění a databázové objekty pro uživatele v celé organizaci. Databricks SQL umožňuje analytikům spouštět dotazy SQL na stejné tabulky, které se používají v produkčních úlohách ETL, což umožňuje ve velkém měřítku business intelligence v reálném čase.
K sestavení kanálů ETL můžete také použít rozdílové živé tabulky. Databricks vytvořila dynamické tabulky Delta, aby se snížila složitost sestavování, nasazování a údržby produkčních kanálů ETL. Viz kurz: Spuštění prvního kanálu dynamických tabulek Delta.
Požadavky
Poznámka:
Pokud nemáte oprávnění ke kontrole clusteru, můžete většinu následujících kroků dokončit, pokud máte přístup ke clusteru.
Krok 1: Vytvoření clusteru
Pokud chcete provádět průzkumnou analýzu dat a přípravu dat, vytvořte cluster, který poskytuje výpočetní prostředky potřebné ke spouštění příkazů.
- Na bočním panelu klikněte na Výpočty.
- Na bočním panelu klikněte na Nový a pak vyberte Cluster. Otevře se stránka Nový cluster nebo výpočetní prostředky.
- Zadejte jedinečný název clusteru.
- Vyberte přepínač s jedním uzlem.
- V rozevíracím seznamu Režim přístupu vyberte jednoho uživatele.
- Ujistěte se, že je vaše e-mailová adresa viditelná v poli Jeden uživatel .
- Vyberte požadovanou verzi modulu runtime Databricks 11.1 nebo vyšší, abyste mohli použít katalog Unity.
- Kliknutím na Vytvořit výpočetní prostředky vytvořte cluster.
Další informace o clusterech Databricks najdete v tématu Výpočty.
Krok 2: Vytvoření poznámkového bloku Databricks
Chcete-li vytvořit poznámkový blok v pracovním prostoru, klepněte na tlačítko Nový na bočním panelu a potom klepněte na příkaz Poznámkový blok. V pracovním prostoru se otevře prázdný poznámkový blok.
Další informace o vytváření a správě poznámkových bloků najdete v tématu Správa poznámkových bloků.
Krok 3: Zápis a čtení dat z externího umístění spravovaného katalogem Unity
Databricks doporučuje používat automatický zavaděč pro přírůstkové příjem dat. Auto Loader automaticky rozpozná a zpracuje nové soubory při jejich doručení do cloudového úložiště objektů.
Pomocí katalogu Unity můžete spravovat zabezpečený přístup k externím umístěním. Uživatelé nebo instanční objekty s oprávněními READ FILES
k externímu umístění můžou k příjmu dat použít automatický zavaděč.
Za normálních okolností data přijdou do externího umístění kvůli zápisům z jiných systémů. V této ukázce můžete simulovat doručení dat tím, že do externího umístění zapíšete soubory JSON.
Zkopírujte následující kód do buňky poznámkového bloku. Nahraďte hodnotu catalog
řetězce názvem katalogu a USE CATALOG
oprávněnímiCREATE CATALOG
. Nahraďte hodnotu external_location
řetězce cestou pro externí umístění znakem READ FILES
, WRITE FILES
a CREATE EXTERNAL TABLE
oprávněními.
Externí umístění se dají definovat jako celý kontejner úložiště, ale často odkazují na adresář vnořený do kontejneru.
Správný formát cesty k externímu umístění je "abfss://container_name@storage_account.dfs.core.windows.net/path/to/external_location"
.
external_location = "<your-external-location>"
catalog = "<your-catalog>"
dbutils.fs.put(f"{external_location}/filename.txt", "Hello world!", True)
display(dbutils.fs.head(f"{external_location}/filename.txt"))
dbutils.fs.rm(f"{external_location}/filename.txt")
display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))
Spuštění této buňky by mělo vytisknout řádek, který čte 12 bajtů, vytiskne řetězec "Hello world!" a zobrazí všechny databáze, které jsou v katalogu. Pokud se vám nedaří tuto buňku spustit, ověřte, že jste v pracovním prostoru s povoleným katalogem Unity, a požádejte správce pracovního prostoru o správná oprávnění k dokončení tohoto kurzu.
Níže uvedený kód Pythonu používá vaši e-mailovou adresu k vytvoření jedinečné databáze v zadaném katalogu a jedinečného umístění úložiště v externím umístění. Spuštěním této buňky odeberete všechna data přidružená k tomuto kurzu, což vám umožní spustit tento příklad idempotentním způsobem. Třída je definována a vytvořena instance, kterou použijete k simulaci dávek dat přicházejících z připojeného systému do vašeho zdrojového externího umístění.
Zkopírujte tento kód do nové buňky v poznámkovém bloku a spusťte ho a nakonfigurujte prostředí.
Poznámka:
Proměnné definované v tomto kódu by vám měly umožnit bezpečné spuštění bez rizika konfliktu s existujícími prostředky pracovního prostoru nebo jinými uživateli. Omezená oprávnění k síti nebo úložišti způsobí chyby při provádění tohoto kódu; Požádejte správce pracovního prostoru o řešení těchto omezení.
from pyspark.sql.functions import col
# Set parameters for isolation in workspace and reset demo
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
database = f"{catalog}.e2e_lakehouse_{username}_db"
source = f"{external_location}/e2e-lakehouse-source"
table = f"{database}.target_table"
checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"
spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.database={database}")
spark.sql(f"SET c.source='{source}'")
spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
spark.sql("CREATE DATABASE ${c.database}")
spark.sql("USE ${c.database}")
# Clear out data from previous demo execution
dbutils.fs.rm(source, True)
dbutils.fs.rm(checkpoint_path, True)
# Define a class to load batches of data to source
class LoadData:
def __init__(self, source):
self.source = source
def get_date(self):
try:
df = spark.read.format("json").load(source)
except:
return "2016-01-01"
batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
if batch_date.month == 3:
raise Exception("Source data exhausted")
return batch_date
def get_batch(self, batch_date):
return (
spark.table("samples.nyctaxi.trips")
.filter(col("tpep_pickup_datetime").cast("date") == batch_date)
)
def write_batch(self, batch):
batch.write.format("json").mode("append").save(self.source)
def land_batch(self):
batch_date = self.get_date()
batch = self.get_batch(batch_date)
self.write_batch(batch)
RawData = LoadData(source)
Teď můžete získat dávku dat zkopírováním následujícího kódu do buňky a jeho spuštěním. Tuto buňku můžete spustit ručně až 60krát a aktivovat tak nové doručení dat.
RawData.land_batch()
Krok 4: Konfigurace automatického zavaděče pro příjem dat do katalogu Unity
Databricks doporučuje ukládat data pomocí Delta Lake. Delta Lake je opensourcová vrstva úložiště, která poskytuje transakce ACID a umožňuje datové jezero. Delta Lake je výchozí formát pro tabulky vytvořené v Databricks.
Pokud chcete nakonfigurovat automatický zavaděč pro příjem dat do tabulky Katalogu Unity, zkopírujte a vložte následující kód do prázdné buňky v poznámkovém bloku:
# Import functions
from pyspark.sql.functions import col, current_timestamp
# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(source)
.select("*", col("_metadata.source").alias("source_file"), current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.option("mergeSchema", "true")
.toTable(table))
Další informace o automatickém zavaděči najdete v tématu Co je automatický zavaděč?.
Další informace o strukturovaném streamování pomocí katalogu Unity najdete v tématu Použití katalogu Unity se strukturovaným streamováním.
Krok 5: Zpracování dat a interakce s nimi
Poznámkové bloky spouštějí logické buňky po buňce. Pomocí těchto kroků spusťte logiku v buňce:
Pokud chcete buňku, kterou jste dokončili v předchozím kroku, spustit, vyberte buňku a stiskněte SHIFT+ENTER.
Pokud chcete zadat dotaz na tabulku, kterou jste právě vytvořili, zkopírujte následující kód a vložte ho do prázdné buňky a stisknutím kombinace kláves SHIFT+ENTER buňku spusťte.
df = spark.read.table(table)
Pokud chcete zobrazit náhled dat v datovém rámci, zkopírujte a vložte následující kód do prázdné buňky a potom buňku spusťte stisknutím kombinace kláves SHIFT+ENTER .
display(df)
Další informace o interaktivních možnostech vizualizace dat najdete v tématu Vizualizace v poznámkových blocích Databricks.
Krok 6: Naplánování úlohy
Poznámkové bloky Databricks můžete spustit jako produkční skripty tak, že je přidáte jako úlohu do úlohy Databricks. V tomto kroku vytvoříte novou úlohu, kterou můžete aktivovat ručně.
Naplánování poznámkového bloku jako úkolu:
- Na pravé straně záhlaví klikněte na Plán .
- Zadejte jedinečný název pro název úlohy.
- Klikněte na Ruční.
- V rozevíracím seznamu Cluster vyberte cluster, který jste vytvořili v kroku 1.
- Klikněte na Vytvořit.
- V zobrazeném okně klikněte na Spustit.
- Pokud chcete zobrazit výsledky spuštění úlohy, klikněte na ikonu vedle časového razítka posledního spuštění .
Další informace o úlohách najdete v tématu Co jsou úlohy Databricks?
Krok 7: Dotazování tabulky z Databricks SQL
Každý, kdo má USE CATALOG
oprávnění k aktuálnímu katalogu, USE SCHEMA
oprávnění k aktuálnímu schématu a SELECT
oprávnění v tabulce, může dotazovat obsah tabulky z preferovaného rozhraní Databricks API.
Ke spouštění dotazů v Databricks SQL potřebujete přístup ke spuštěné službě SQL Warehouse.
Tabulka, kterou jste vytvořili dříve v tomto kurzu, má název target_table
. Můžete se na něj dotazovat pomocí katalogu, který jste zadali v první buňce a databázi s paternem e2e_lakehouse_<your-username>
. Průzkumníka katalogu můžete použít k vyhledání datových objektů, které jste vytvořili.
Další integrace
Další informace o integracích a nástrojích pro přípravu dat pomocí Azure Databricks: