Použití příkazu foreachBatch k zápisu do libovolných datových jímek
Tento článek popisuje použití foreachBatch
se strukturovaným streamováním k zápisu výstupu dotazu streamování do zdrojů dat, které nemají existující jímku streamování.
Vzor streamingDF.writeStream.foreachBatch(...)
kódu umožňuje použít dávkové funkce na výstupní data každého mikrodávkového dotazu streamování. Funkce používané s foreachBatch
berou dvě parameters:
- Datový rámec, který obsahuje výstupní data mikrodávkové dávky.
- Jedinečné ID mikrodávkové dávky.
Musíte použít foreachBatch
pro operace sloučení Delta Lake ve strukturovaném streamování. Viz Upsert ze streamovaných dotazů pomocí foreachBatch.
Použití dalších operací datového rámce
V streamovaných datových rámcích se nepodporuje mnoho operací datových rámců a datových sad, protože Spark v těchto případech nepodporuje generování přírůstkových plánů. Některé z těchto operací můžete použít foreachBatch()
na každém výstupu mikrodávkové dávky. Například můžete použít foreachBatch()
a SQL operaci MERGE INTO
pro zapsání výstupu ze streamovacích agregací do Delta table v režimu update. Další podrobnosti najdete v MERGE INTO.
Důležité
-
foreachBatch()
poskytuje záruky zápisu pouze jednou. Můžete ale použítbatchId
, která je poskytnuta funkci, jako způsob ke deduplikaci výstupu a get jako záruku přesného zpracování jednou. V obou případech budete muset zdůvodnění kompletní sémantiky sami. -
foreachBatch()
nefunguje s režimem průběžného zpracování, protože se v podstatě spoléhá na mikrodávkové spouštění streamovacího dotazu. Pokud zapisujete data v nepřetržitém režimu, použijteforeach()
místo toho.
Prázdný datový rámec lze vyvolat a foreachBatch()
uživatelský kód musí být odolný, aby umožňoval správné operace. Příklad najdete tady:
.foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid data frames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()
Změny chování v foreachBatch
Databricks Runtime 14.0
Ve službě Databricks Runtime 14.0 a vyšší na výpočetních prostředcích nakonfigurovaných v režimu sdíleného přístupu platí následující změny chování:
-
print()
příkazy zapisuje výstup do protokolů ovladačů. - K dílčímu
dbutils.widgets
modulu uvnitř funkce nelze získat přístup. - Všechny soubory, moduly nebo objekty odkazované ve funkci musí být serializovatelné a dostupné ve Sparku.
Opakované použití existujících dávkových zdrojů dat
Pomocí foreachBatch()
, můžete použít existující dávkové zapisovače dat pro jímky dat, které nemusí mít podporu strukturovaného streamování. Tady je pár příkladů:
Mnoho dalších dávkových zdrojů dat lze použít z foreachBatch()
. Viz Připojení ke zdrojům dat.
Zápis do více umístění
Pokud potřebujete napsat výstup streamovacího dotazu do více umístění, databricks doporučuje používat více zapisovačů strukturovaného streamování pro zajištění nejlepší paralelizace a propustnosti.
Použití foreachBatch
k zápisu do více jímek serializuje provádění zápisů streamování, což může zvýšit latenci pro každou mikrodávku.
Pokud používáte foreachBatch
k zápisu do více objektů Delta tables, přečtěte si část tableforeachBatch .