Uruchamianie pierwszego obciążenia przesyłania strumieniowego ze strukturą
Ten artykuł zawiera przykłady kodu i wyjaśnienie podstawowych pojęć niezbędnych do uruchamiania pierwszych zapytań przesyłania strumieniowego ze strukturą w usłudze Azure Databricks. Możesz użyć przesyłania strumieniowego ze strukturą na potrzeby obciążeń przetwarzania przyrostowego i niemal w czasie rzeczywistym.
Przesyłanie strumieniowe ze strukturą to jedna z kilku technologii, które zasilają tabele przesyłania strumieniowego w tabelach delta live. Usługa Databricks zaleca używanie tabel delta Live Tables dla wszystkich nowych obciążeń ETL, pozyskiwania i przesyłania strumieniowego ze strukturą. Zobacz Co to jest delta live tables?.
Uwaga
Podczas gdy tabele delta live zapewniają nieco zmodyfikowaną składnię deklarowania tabel przesyłania strumieniowego, ogólna składnia konfigurowania odczytów i przekształceń przesyłania strumieniowego dotyczy wszystkich przypadków użycia przesyłania strumieniowego w usłudze Azure Databricks. Delta Live Tables upraszcza również przesyłanie strumieniowe, zarządzając informacjami o stanie, metadanymi i wieloma konfiguracjami.
Używanie automatycznego modułu ładującego do odczytywania danych przesyłanych strumieniowo z magazynu obiektów
W poniższym przykładzie pokazano ładowanie danych JSON za pomocą modułu automatycznego ładującego, które używa cloudFiles
do oznaczania formatu i opcji. Opcja schemaLocation
umożliwia wnioskowanie i ewolucję schematu. Wklej następujący kod w komórce notesu usługi Databricks i uruchom komórkę, aby utworzyć ramkę danych przesyłania strumieniowego o nazwie raw_df
:
file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"
raw_df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
)
Podobnie jak inne operacje odczytu w usłudze Azure Databricks, konfigurowanie odczytu przesyłania strumieniowego w rzeczywistości nie powoduje załadowania danych. Musisz wyzwolić akcję na danych przed rozpoczęciem strumienia.
Uwaga
Wywołanie display()
ramki danych przesyłania strumieniowego uruchamia zadanie przesyłania strumieniowego. W przypadku większości przypadków użycia przesyłania strumieniowego ze strukturą akcja, która wyzwala strumień, powinna zapisywać dane w ujściu. Zobacz Zagadnienia dotyczące produkcji przesyłania strumieniowego ze strukturą.
Wykonywanie transformacji przesyłania strumieniowego
Przesyłanie strumieniowe ze strukturą obsługuje większość przekształceń dostępnych w usługach Azure Databricks i Spark SQL. Modele MLflow można nawet załadować jako funkcje zdefiniowane przez użytkownika i przewidywać przesyłanie strumieniowe jako transformację.
Poniższy przykład kodu wykonuje prostą transformację, aby wzbogacić pozyskane dane JSON o dodatkowe informacje przy użyciu funkcji Spark SQL:
from pyspark.sql.functions import col, current_timestamp
transformed_df = (raw_df.select(
"*",
col("_metadata.file_path").alias("source_file"),
current_timestamp().alias("processing_time")
)
)
transformed_df
Wynik zawiera instrukcje zapytania dotyczące ładowania i przekształcania każdego rekordu w miarę ich nadejścia w źródle danych.
Uwaga
Przesyłanie strumieniowe ze strukturą traktuje źródła danych jako niezwiązane lub nieskończone zestawy danych. W związku z tym niektóre przekształcenia nie są obsługiwane w obciążeniach przesyłania strumieniowego ze strukturą, ponieważ wymagają one sortowania nieskończonej liczby elementów.
Większość agregacji i wielu sprzężeń wymaga zarządzania informacjami o stanie za pomocą znaków wodnych, okien i trybu danych wyjściowych. Zobacz Stosowanie znaków wodnych, aby kontrolować progi przetwarzania danych.
Wykonywanie przyrostowego zapisu wsadowego w usłudze Delta Lake
Poniższy przykład zapisuje w usłudze Delta Lake przy użyciu określonej ścieżki pliku i punktu kontrolnego.
Ważne
Zawsze upewnij się, że określono unikatową lokalizację punktu kontrolnego dla każdego skonfigurowanego składnika zapisywania przesyłania strumieniowego. Punkt kontrolny zapewnia unikatową tożsamość strumienia, śledząc wszystkie przetworzone rekordy i informacje o stanie skojarzone z zapytaniem przesyłanym strumieniowo.
Ustawienie availableNow
wyzwalacza powoduje, że przesyłanie strumieniowe ze strukturą przetwarza wszystkie wcześniej nieprzetworzone rekordy z źródłowego zestawu danych, a następnie zamyka, aby można było bezpiecznie wykonać następujący kod bez obaw o pozostawienie strumienia uruchomionego:
target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"
transformed_df.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", checkpoint_path)
.option("path", target_path)
.start()
W tym przykładzie żadne nowe rekordy nie docierają do naszego źródła danych, więc powtórzenie wykonywania tego kodu nie powoduje pozyskiwania nowych rekordów.
Ostrzeżenie
Wykonywanie przesyłania strumieniowego ze strukturą może uniemożliwić automatyczne zakończenie zamykania zasobów obliczeniowych. Aby uniknąć nieoczekiwanych kosztów, pamiętaj o przerwaniu zapytań przesyłania strumieniowego.
Odczytywanie danych z usługi Delta Lake, przekształcanie i zapisywanie w usłudze Delta Lake
Usługa Delta Lake ma rozbudowaną obsługę pracy z przesyłaniem strumieniowym ze strukturą jako źródłem i ujściem. Zobacz Delta table streaming reads and writes (Odczyty i zapisy w tabeli delty).
W poniższym przykładzie pokazano przykładową składnię umożliwiającą przyrostowe ładowanie wszystkich nowych rekordów z tabeli delty, łączenie ich z migawką innej tabeli delty i zapisywanie ich w tabeli delty:
(spark.readStream
.table("<table-name1>")
.join(spark.read.table("<table-name2>"), on="<id>", how="left")
.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", "<checkpoint-path>")
.toTable("<table-name3>")
)
Musisz mieć odpowiednie uprawnienia skonfigurowane do odczytu tabel źródłowych i zapisu w tabelach docelowych oraz określonej lokalizacji punktu kontrolnego. Wypełnij wszystkie parametry oznaczone nawiasami kątowymi (<>
) przy użyciu odpowiednich wartości dla źródeł danych i ujść.
Uwaga
Delta Live Tables zapewnia w pełni deklaratywną składnię tworzenia potoków usługi Delta Lake i automatycznie zarządza właściwościami, takimi jak wyzwalacze i punkty kontrolne. Zobacz Co to jest delta live tables?.
Odczytywanie danych z platformy Kafka, przekształcanie i zapisywanie na platformie Kafka
Platforma Apache Kafka i inne magistrale obsługi komunikatów zapewniają jedne z najniższych opóźnień dostępnych dla dużych zestawów danych. Za pomocą usługi Azure Databricks można zastosować przekształcenia do danych pozyskanych z platformy Kafka, a następnie zapisywać dane z powrotem na platformie Kafka.
Uwaga
Zapisywanie danych w magazynie obiektów w chmurze zwiększa dodatkowe obciążenie związane z opóźnieniami. Jeśli chcesz przechowywać dane z magistrali komunikatów w usłudze Delta Lake, ale wymagają najmniejszego możliwego opóźnienia w przypadku obciążeń przesyłanych strumieniowo, usługa Databricks zaleca skonfigurowanie oddzielnych zadań przesyłania strumieniowego w celu pozyskiwania danych do usługi Lakehouse i stosowania przekształceń niemal w czasie rzeczywistym na potrzeby ujścia magistrali komunikatów podrzędnych.
Poniższy przykład kodu przedstawia prosty wzorzec wzbogacania danych z platformy Kafka przez dołączenie ich do danych w tabeli delty, a następnie zapisanie z powrotem na platformie Kafka:
(spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
.join(spark.read.table("<table-name>"), on="<id>", how="left")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.option("checkpointLocation", "<checkpoint-path>")
.start()
)
Musisz mieć odpowiednie uprawnienia skonfigurowane do uzyskiwania dostępu do usługi Kafka. Wypełnij wszystkie parametry oznaczone nawiasami kątowymi (<>
) przy użyciu odpowiednich wartości dla źródeł danych i ujść. Zobacz Przetwarzanie strumieniowe przy użyciu platform Apache Kafka i Azure Databricks.