Przesyłanie strumieniowe i pozyskiwanie przyrostowe
Usługa Azure Databricks używa przesyłania strumieniowego ze strukturą platformy Apache Spark do obsługi wielu produktów skojarzonych z obciążeniami pozyskiwania, w tym:
- Moduł ładujący automatycznie
COPY INTO
- Potoki tabel na żywo usługi Delta
- Zmaterializowane widoki i tabele przesyłania strumieniowego w usłudze Databricks SQL
W tym artykule omówiono niektóre różnice między semantyki przetwarzania wsadowego przesyłania strumieniowego i przyrostowego oraz przedstawiono ogólne omówienie konfigurowania obciążeń pozyskiwania dla żądanej semantyki w usłudze Databricks.
Jaka jest różnica między pozyskiwaniem wsadowym przesyłania strumieniowego i przyrostowego?
Możliwe konfiguracje przepływu pracy pozyskiwania wahają się od przetwarzania niemal w czasie rzeczywistym do rzadko przyrostowego przetwarzania wsadowego. Oba wzorce używają przesyłania strumieniowego ze strukturą platformy Apache Spark do obsługi przetwarzania przyrostowego, ale mają różne semantyki. Dla uproszczenia ten artykuł odnosi się do pozyskiwania danych w czasie zbliżonym do rzeczywistego jako pozyskiwania przesyłania strumieniowego i częściej przyrostowego przetwarzania wsadowego jako przyrostowego pozyskiwania wsadowego.
Pozyskiwanie danych za pośrednictwem przesyłania strumieniowego
Przesyłanie strumieniowe w kontekście pozyskiwania danych i aktualizacji tabel odnosi się do przetwarzania danych niemal w czasie rzeczywistym, w którym usługa Azure Databricks pozyskuje rekordy ze źródła do ujścia w mikrobajtach przy użyciu zawsze włączonej infrastruktury. Obciążenie przesyłania strumieniowego stale pozyskiwa aktualizacje ze skonfigurowanych źródeł danych, chyba że wystąpi błąd, który zatrzymuje pozyskiwanie.
Pozyskiwanie wsadowe przyrostowe
Pozyskiwanie wsadowe przyrostowe odnosi się do wzorca, w którym wszystkie nowe rekordy są przetwarzane ze źródła danych w krótkotrwałym zadaniu. Pozyskiwanie wsadowe przyrostowe często występuje zgodnie z harmonogramem, ale może być również wyzwalane ręcznie lub na podstawie przybycia pliku.
Pozyskiwanie wsadowe przyrostowe różni się od pozyskiwania wsadowego, ponieważ automatycznie wykrywa nowe rekordy w źródle danych i ignoruje rekordy, które zostały już pozyskane.
Pozyskiwanie przy użyciu zadań
Zadania usługi Databricks umożliwiają organizowanie przepływów pracy i planowanie zadań obejmujących notesy, biblioteki, potoki delta live tables i zapytania SQL usługi Databricks.
Uwaga
Do konfigurowania pozyskiwania przyrostowego wsadowego można użyć wszystkich typów obliczeniowych i typów zadań usługi Azure Databricks. Pozyskiwanie przesyłania strumieniowego jest obsługiwane tylko w środowisku produkcyjnym na obliczeniach klasycznych zadań i tabelach Delta Live Tables.
Zadania mają dwa podstawowe tryby działania:
- Zadania ciągłe są automatycznie ponawiane w przypadku wystąpienia awarii. Ten tryb jest przeznaczony do pozyskiwania danych przesyłanych strumieniowo.
-
Wyzwalane zadania uruchamiają zadania po wyzwoleniu. Wyzwalacze obejmują:
- Wyzwalacze oparte na czasie, które uruchamiają zadania zgodnie z określonym harmonogramem.
- Wyzwalacze oparte na plikach, które uruchamiają zadania, gdy pliki trafiają do określonej lokalizacji.
- Inne wyzwalacze, takie jak wywołania interfejsu API REST, wykonywanie poleceń interfejsu wiersza polecenia usługi Azure Databricks lub kliknięcie przycisku Uruchom teraz w interfejsie użytkownika obszaru roboczego.
W przypadku obciążeń wsadowych przyrostowych skonfiguruj zadania przy użyciu AvailableNow
trybu wyzwalacza w następujący sposób:
Python
(df.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(availableNow=True)
.toTable("table_name")
)
Scala
import org.apache.spark.sql.streaming.Trigger
df.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(Trigger.AvailableNow)
.toTable("table_name")
W przypadku obciążeń przesyłania strumieniowego domyślny interwał wyzwalacza to processingTime ="500ms"
. W poniższym przykładzie pokazano, jak przetwarzać mikrosadę co 5 sekund:
Python
(df.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(processingTime="5 seconds")
.toTable("table_name")
)
Scala
import org.apache.spark.sql.streaming.Trigger
df.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(Trigger.ProcessingTime, "5 seconds")
.toTable("table_name")
Ważne
Zadania bezserwerowe nie obsługują interwałów wyzwalaczy scala, trybu ciągłego ani interwałów wyzwalaczy opartych na czasie dla przesyłania strumieniowego ze strukturą. Użyj zadań klasycznych, jeśli potrzebujesz semantyki pozyskiwania danych niemal w czasie rzeczywistym.
Pozyskiwanie przy użyciu tabel różnicowych na żywo
Podobnie jak w przypadku zadań potoki delta live tables mogą być uruchamiane w trybie wyzwalanym lub ciągłym. W przypadku semantyki przesyłania strumieniowego niemal w czasie rzeczywistym z tabelami przesyłania strumieniowego użyj trybu ciągłego.
Użyj tabel przesyłania strumieniowego, aby skonfigurować pozyskiwanie danych strumieniowych lub przyrostowych partii z magazynu obiektów w chmurze, Apache Kafka, Amazon Kinesis, Google Pub/Sub lub Apache Pulsar.
Program LakeFlow Connect używa tabel delta live do konfigurowania potoków pozyskiwania z połączonych systemów. Zobacz LakeFlow Connect.
Zmaterializowane widoki gwarantują semantyka operacji równoważną obciążeniom wsadowym, ale może zoptymalizować wiele operacji w celu obliczenia wyników przyrostowo. Zobacz Odświeżanie przyrostowe, aby uzyskać zmaterializowane widoki.
Pozyskiwanie przy użyciu usługi Databricks SQL
Tabele przesyłania strumieniowego umożliwiają konfigurowanie przyrostowego pozyskiwania wsadowego z magazynu obiektów w chmurze, Apache Kafka, Amazon Kinesis, Google Pub/Sub lub Apache Pulsar.
Za pomocą zmaterializowanych widoków można skonfigurować przyrostowe przetwarzanie wsadowe z źródeł Delta. Zobacz Odświeżanie przyrostowe dla widoków zmaterializowanych.
COPY INTO
Udostępnia znaną składnię SQL na potrzeby przyrostowego przetwarzania wsadowego dla plików danych w magazynie obiektów w chmurze.
COPY INTO
zachowanie jest podobne do wzorców obsługiwanych przez tabele przesyłania strumieniowego dla magazynu obiektów w chmurze, ale nie wszystkie ustawienia domyślne są równoważne dla wszystkich obsługiwanych formatów plików.