Spuštění první úlohy strukturovaného streamování
Tento článek obsahuje příklady kódu a vysvětlení základních konceptů nezbytných ke spuštění prvních dotazů strukturovaného streamování v Azure Databricks. Strukturované streamování můžete použít pro úlohy téměř v reálném čase a přírůstkové zpracování.
Strukturované streamování je jednou z několika technologií, které pohání streamování tables v Delta Live pomocí Tables. Databricks doporučuje používat delta live Tables pro všechny nové úlohy ETL, ingestování a strukturovaného streamování. Podívejte se na Co je Delta Live?Tables.
Poznámka:
I když Delta Live Tables poskytuje mírně upravenou syntaxi pro deklaraci streamování tables, obecná syntaxe konfigurace čtení a transformací streamování platí pro všechny případy použití streamování v Azure Databricks. Delta Live Tables také zjednodušuje streamování tím, že spravuje informace o stavu, metadata a řadu konfigurací.
Čtení streamovaných dat z úložiště objektů pomocí automatického zavaděče
Následující příklad ukazuje načtení dat JSON pomocí automatického zavaděče, který používá cloudFiles
k označení formátu a možností. Možnost schemaLocation
umožňuje schema odvozovat a vyvíjet. Do buňky poznámkového bloku Databricks vložte následující kód a spusťte buňku, aby se vytvořil datový rámec streamování s názvem raw_df
:
file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"
raw_df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
)
Stejně jako jiné operace čtení v Azure Databricks se při konfiguraci streamovaného čtení ve skutečnosti nenačítají data. Před zahájením datového proudu je nutné aktivovat akci s daty.
Poznámka:
Volání display()
streamovaného datového rámce spustí úlohu streamování. U většiny případů použití strukturovaného streamování by akce, která aktivuje datový proud, měla zapisovat data do jímky. Viz aspekty produkce strukturovaného streamování.
Provedení transformace streamování
Strukturované streamování podporuje většinu transformací, které jsou k dispozici v Azure Databricks a Spark SQL. Modely MLflow můžete dokonce načíst jako UDF a vytvářet předpovědi streamování jako transformaci.
Následující příklad kódu dokončí jednoduchou transformaci pro obohacení přijatých dat JSON dalšími informacemi pomocí funkcí Spark SQL:
from pyspark.sql.functions import col, current_timestamp
transformed_df = (raw_df.select(
"*",
col("_metadata.file_path").alias("source_file"),
current_timestamp().alias("processing_time")
)
)
Výsledná analýza transformed_df
obsahuje pokyny k načtení a transformaci každého záznamu při příchodu do zdroje dat.
Poznámka:
Strukturované streamování zpracovává zdroje dat jako nevázané nebo nekonečné datové sady. Některé transformace nejsou v úlohách strukturovaného streamování podporované, protože by vyžadovaly řazení nekonečného počtu položek.
Většina agregací a mnoho spojení vyžaduje správu informací o stavu pomocí vodoznaků, oken a výstupního režimu. Viz Použití vodoznaků pro řízení prahových hodnot zpracování dat.
Provedení přírůstkového dávkového zápisu do Delta Lake
Následující příklad zapíše do Delta Lake pomocí zadané cesty k souboru a kontrolního bodu.
Důležité
Vždy se ujistěte, že pro každý nakonfigurovaný zapisovač streamování zadáte jedinečné umístění kontrolního bodu. Kontrolní bod poskytuje jedinečnou identitu streamu, sledování všech zpracovaných záznamů a informací o stavu přidružených k dotazu streamování.
Nastavení availableNow
triggeru dává strukturovanému streamování pokyn ke zpracování všech dříve nezpracovaných záznamů ze zdrojové datové sady a následné vypnutí, takže můžete bezpečně spustit následující kód, aniž byste se museli starat o opuštění streamu spuštěného:
target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"
transformed_df.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", checkpoint_path)
.option("path", target_path)
.start()
V tomto příkladu do našeho zdroje dat nepřicházejí žádné nové záznamy, takže opakované spuštění tohoto kódu neingestuje nové záznamy.
Upozorňující
Spuštění strukturovaného streamování může zabránit vypnutí výpočetních prostředků automatickým ukončením. Abyste se vyhnuli neočekávaným nákladům, nezapomeňte ukončit dotazy streamování.
Čtení dat z Delta Lake, transformace a zápisu do Delta Lake
Delta Lake má rozsáhlou podporu pro práci se strukturovaným streamováním jako zdrojem i jímkou. Podívejte se na Delta table streamované čtení a zápisy.
Následující příklad ukazuje ukázkovou syntaxi pro přírůstkové načtení všech nových záznamů z tableDelta , join je se snímkem jiného rozdílového tablea zapsat je do tableDelta:
(spark.readStream
.table("<table-name1>")
.join(spark.read.table("<table-name2>"), on="<id>", how="left")
.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", "<checkpoint-path>")
.toTable("<table-name3>")
)
Pro čtení zdrojového tables a zápisu do cílového tables a zadaného umístění kontrolního bodu musíte mít nakonfigurovaná správná oprávnění. Vyplňte všechny parameters označené úhlovými závorkami (<>
) pomocí příslušných values pro zdroje dat a jímky.
Poznámka:
Delta Live Tables poskytuje plně deklarativní syntaxi pro vytváření kanálů Delta Lake a spravuje vlastnosti, jako jsou triggery a kontrolní body, automaticky. Podívejte se na Co je Delta Live Tables?.
Čtení dat ze systému Kafka, transformace a zápisu do Systému Kafka
Apache Kafka a další sběrnice zasílání zpráv poskytují některé z nejnižších latencí dostupných pro velké datové sady. Azure Databricks můžete použít k aplikování transformací na data ingestované z Kafka a následnému zápisu dat zpět do Kafka.
Poznámka:
Zápis dat do cloudového úložiště objektů zvyšuje režijní náklady na latenci. Pokud chcete ukládat data z sběrnice zasílání zpráv v Delta Lake, ale vyžadovat nejnižší možnou latenci pro úlohy streamování, databricks doporučuje nakonfigurovat samostatné úlohy streamování tak, aby ingestovala data do jezera a použila transformace téměř v reálném čase pro podřízené sběrnice zasílání zpráv.
Následující příklad kódu ukazuje jednoduchý vzor pro obohacení dat ze systému Kafka jejich spojením s daty v delta table a následným zápisem zpět do Systému Kafka:
(spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
.join(spark.read.table("<table-name>"), on="<id>", how="left")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.option("checkpointLocation", "<checkpoint-path>")
.start()
)
Pro přístup ke službě Kafka musíte mít nakonfigurovaná správná oprávnění. Vyplňte všechny parameters označené úhlovými závorkami (<>
) pomocí příslušných values pro zdroje dat a jímky. Viz Zpracování datových proudů s využitím Apache Kafka a Azure Databricks.