Sdílet prostřednictvím


Použití příkazu foreachBatch k zápisu do libovolných datových jímek

Tento článek popisuje použití foreachBatch se strukturovaným streamováním k zápisu výstupu dotazu streamování do zdrojů dat, které nemají existující jímku streamování.

Vzor streamingDF.writeStream.foreachBatch(...) kódu umožňuje použít dávkové funkce na výstupní data každého mikrodávkového dotazu streamování. Funkce používané s foreachBatch berou dvě parameters:

  • Datový rámec, který obsahuje výstupní data mikrodávkové dávky.
  • Jedinečné ID mikrodávkové dávky.

Musíte použít foreachBatch pro operace sloučení Delta Lake ve strukturovaném streamování. Viz Upsert ze streamovaných dotazů pomocí foreachBatch.

Použití dalších operací datového rámce

V streamovaných datových rámcích se nepodporuje mnoho operací datových rámců a datových sad, protože Spark v těchto případech nepodporuje generování přírůstkových plánů. Některé z těchto operací můžete použít foreachBatch() na každém výstupu mikrodávkové dávky. Například můžete použít foreachBatch() a SQL operaci MERGE INTO pro zapsání výstupu ze streamovacích agregací do Delta table v režimu update. Další podrobnosti najdete v MERGE INTO.

Důležité

  • foreachBatch() poskytuje záruky zápisu pouze jednou. Můžete ale použít batchId, která je poskytnuta funkci, jako způsob ke deduplikaci výstupu a get jako záruku přesného zpracování jednou. V obou případech budete muset zdůvodnění kompletní sémantiky sami.
  • foreachBatch()nefunguje s režimem průběžného zpracování, protože se v podstatě spoléhá na mikrodávkové spouštění streamovacího dotazu. Pokud zapisujete data v nepřetržitém režimu, použijte foreach() místo toho.

Prázdný datový rámec lze vyvolat a foreachBatch() uživatelský kód musí být odolný, aby umožňoval správné operace. Příklad najdete tady:

  .foreachBatch(
          (outputDf: DataFrame, bid: Long) => {
             // Process valid data frames only
             if (!outputDf.isEmpty) {
                // business logic
             }
         }
  ).start()

Změny chování v foreachBatch Databricks Runtime 14.0

Ve službě Databricks Runtime 14.0 a vyšší na výpočetních prostředcích nakonfigurovaných v režimu sdíleného přístupu platí následující změny chování:

  • print() příkazy zapisuje výstup do protokolů ovladačů.
  • K dílčímu dbutils.widgets modulu uvnitř funkce nelze získat přístup.
  • Všechny soubory, moduly nebo objekty odkazované ve funkci musí být serializovatelné a dostupné ve Sparku.

Opakované použití existujících dávkových zdrojů dat

Pomocí foreachBatch(), můžete použít existující dávkové zapisovače dat pro jímky dat, které nemusí mít podporu strukturovaného streamování. Tady je pár příkladů:

Mnoho dalších dávkových zdrojů dat lze použít z foreachBatch(). Viz Připojení ke zdrojům dat.

Zápis do více umístění

Pokud potřebujete napsat výstup streamovacího dotazu do více umístění, databricks doporučuje používat více zapisovačů strukturovaného streamování pro zajištění nejlepší paralelizace a propustnosti.

Použití foreachBatch k zápisu do více jímek serializuje provádění zápisů streamování, což může zvýšit latenci pro každou mikrodávku.

Pokud používáte foreachBatch k zápisu do více objektů Delta tables, přečtěte si část tableforeachBatch .