Udostępnij za pośrednictwem


Używanie narzędzia foreachBatch do zapisywania w dowolnych ujściach danych

W tym artykule omówiono używanie funkcji foreachBatch przesyłania strumieniowego ze strukturą do zapisywania danych wyjściowych zapytania przesyłania strumieniowego do źródeł danych, które nie mają istniejącego ujścia przesyłania strumieniowego.

Wzorzec streamingDF.writeStream.foreachBatch(...) kodu umożliwia zastosowanie funkcji wsadowych do danych wyjściowych każdej mikrosadowej zapytania przesyłania strumieniowego. Funkcje używane z użyciem foreachBatch dwóch parametrów:

  • Ramka danych zawierająca dane wyjściowe mikrosadowej.
  • Unikatowy identyfikator mikrosadowej partii.

Operacje scalania usługi Delta Lake należy używać foreachBatch w przesyłania strumieniowego ze strukturą. Zobacz Upsert from streaming queries using foreachBatch (Upsert z zapytań przesyłanych strumieniowo przy użyciu polecenia foreachBatch).

Stosowanie dodatkowych operacji ramki danych

Wiele operacji ramek danych i zestawów danych nie jest obsługiwanych w przypadku przesyłania strumieniowego ramek danych, ponieważ platforma Spark nie obsługuje generowania planów przyrostowych w tych przypadkach. Za pomocą foreachBatch() tych operacji można zastosować niektóre operacje na danych wyjściowych mikrosadowych. Na przykład można użyć foreachBath() operacji SQL MERGE INTO i zapisać dane wyjściowe agregacji przesyłania strumieniowego do tabeli delty w trybie aktualizacji. Zobacz więcej szczegółów w temacie MERGE INTO(SCAL INTO).

Ważne

  • foreachBatch() zapewnia tylko co najmniej jednokrotne gwarancje zapisu. Można jednak użyć podanej batchId funkcji jako sposobu deduplikacji danych wyjściowych i uzyskać dokładnie jednokrotną gwarancję. W obu przypadkach konieczne będzie samodzielne wnioskowanie o semantyce kompleksowej.
  • foreachBatch()nie działa w trybie ciągłego przetwarzania, ponieważ zasadniczo opiera się na mikrosadowym wykonywaniu zapytania przesyłania strumieniowego. Jeśli zapisujesz dane w trybie ciągłym, użyj foreach() zamiast tego.

Pustą ramkę danych można wywołać za pomocą foreachBatch() elementu , a kod użytkownika musi być odporny, aby umożliwić właściwą operację. Przykład można znaleźć tutaj:

  .foreachBatch(
          (outputDf: DataFrame, bid: Long) => {
             // Process valid data frames only
             if (!outputDf.isEmpty) {
                // business logic
             }
         }
  ).start()

Zmiany zachowania w foreachBatch środowisku Databricks Runtime 14.0

W środowisku Databricks Runtime 14.0 lub nowszym w środowisku obliczeniowym skonfigurowanym z trybem dostępu współdzielonego obowiązują następujące zmiany zachowania:

  • print() polecenia zapisują dane wyjściowe w dziennikach sterowników.
  • Nie można uzyskać dostępu do modułu dbutils.widgets podrzędnego wewnątrz funkcji.
  • Wszystkie pliki, moduły lub obiekty, do których odwołuje się funkcja, muszą być serializowalne i dostępne na platformie Spark.

Ponowne używanie istniejących źródeł danych wsadowych

Za pomocą programu foreachBatch()można użyć istniejących modułów zapisywania danych wsadowych do ujścia danych, które mogą nie obsługiwać przesyłania strumieniowego ze strukturą. Oto kilka przykładów:

Wiele innych źródeł danych wsadowych może być używanych z programu foreachBatch(). Zobacz Nawiązywanie połączenia ze źródłami danych.

Zapisywanie w wielu lokalizacjach

Jeśli musisz napisać dane wyjściowe zapytania przesyłania strumieniowego w wielu lokalizacjach, usługa Databricks zaleca używanie wielu składników zapisywania przesyłania strumieniowego ze strukturą w celu uzyskania najlepszej równoległości i przepływności.

Służy foreachBatch do zapisywania w wielu ujściach serializuje wykonywanie zapisów przesyłanych strumieniowo, co może zwiększyć opóźnienie dla każdej mikrosadowej partii.

Jeśli używasz foreachBatch funkcji do zapisywania w wielu tabelach delty, zobacz Idempotent table writes in foreachBatch (Zapisy w tabelach idempotentnych w foreachBatch).