Przetwarzanie strumieniowe przy użyciu platformy Apache Kafka i usługi Azure Databricks
W tym artykule opisano, jak można używać platformy Apache Kafka jako źródła lub ujścia podczas uruchamiania obciążeń przesyłania strumieniowego ze strukturą w usłudze Azure Databricks.
Aby uzyskać więcej informacji na temat platformy Kafka, zobacz dokumentację platformy Kafka.
Odczytywanie danych z platformy Kafka
Poniżej przedstawiono przykład przesyłania strumieniowego z platformy Kafka:
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)
Usługa Azure Databricks obsługuje również semantyka odczytu wsadowego dla źródeł danych platformy Kafka, jak pokazano w poniższym przykładzie:
df = (spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
)
W przypadku ładowania przyrostowego wsadowego usługa Databricks zaleca używanie platformy Kafka z usługą Trigger.AvailableNow
. Zobacz Konfigurowanie przyrostowego przetwarzania wsadowego.
W środowisku Databricks Runtime 13.3 LTS i nowszym usługa Azure Databricks udostępnia funkcję SQL do odczytywania danych platformy Kafka. Przesyłanie strumieniowe za pomocą języka SQL jest obsługiwane tylko w tabelach delta live lub w tabelach przesyłania strumieniowego w usłudze Databricks SQL. Zobacz read_kafka funkcji wartości tabeli.
Konfigurowanie czytnika przesyłania strumieniowego ze strukturą platformy Kafka
Usługa Azure Databricks udostępnia kafka
słowo kluczowe jako format danych do konfigurowania połączeń z platformą Kafka w wersji 0.10 lub nowszej.
Poniżej przedstawiono najbardziej typowe konfiguracje platformy Kafka:
Istnieje wiele sposobów określania tematów do zasubskrybowania. Należy podać tylko jeden z następujących parametrów:
Opcja | Wartość | Opis |
---|---|---|
subskrybowanie | Rozdzielona przecinkami lista tematów. | Lista tematów do subskrybowania. |
subscribePattern | Ciąg wyrażenia regularnego języka Java. | Wzorzec używany do subskrybowania tematów. |
przypisywanie | Ciąg {"topicA":[0,1],"topic":[2,4]} JSON . |
Określone tematyPartition do korzystania. |
Inne istotne konfiguracje:
Opcja | Wartość | Wartość domyślna | opis |
---|---|---|---|
kafka.bootstrap.servers | Rozdzielona przecinkami lista host:port. | empty | [Wymagane] Konfiguracja platformy Kafka bootstrap.servers . Jeśli okaże się, że nie ma danych z platformy Kafka, najpierw sprawdź listę adresów brokera. Jeśli lista adresów brokera jest niepoprawna, może nie występować żadne błędy. Dzieje się tak, ponieważ klient platformy Kafka zakłada, że brokerzy staną się dostępne w końcu i w przypadku błędów sieci ponawiania próby na zawsze. |
failOnDataLoss | Usługa true lub false . |
true |
[Opcjonalnie] Czy zapytanie nie powiodło się, jeśli jest możliwe, że dane zostały utracone. Zapytania mogą trwale nie odczytać danych z platformy Kafka ze względu na wiele scenariuszy, takich jak usunięte tematy, obcięcie tematu przed przetworzeniem itd. Staramy się oszacować konserwatywnie, czy dane mogły być utracone, czy nie. Czasami może to spowodować fałszywe alarmy. Ustaw tę opcję na false wartość , jeśli nie działa zgodnie z oczekiwaniami lub chcesz, aby zapytanie kontynuowało przetwarzanie pomimo utraty danych. |
minPartitions | Liczba całkowita >= 0, 0 = wyłączona. | 0 (wyłączone) | [Opcjonalnie] Minimalna liczba partycji do odczytu z platformy Kafka. Platformę Spark można skonfigurować tak, aby używała dowolnej minimalnej liczby partycji do odczytu z platformy Kafka przy użyciu minPartitions opcji . Zwykle platforma Spark ma mapowanie 1–1 tematów platformy KafkaPartitions na partycje platformy Spark zużywane z platformy Kafka. Jeśli ustawisz minPartitions opcję na wartość większą niż temat platformy KafkaPartitions, platforma Spark podzieli duże partycje platformy Kafka na mniejsze elementy. Tę opcję można ustawić w okresach szczytowych obciążeń, niesymetryczności danych i w miarę zwiększania szybkości przetwarzania strumienia. Wiąże się to z kosztem inicjowania użytkowników platformy Kafka w każdym wyzwalaczu, co może mieć wpływ na wydajność w przypadku używania protokołu SSL podczas nawiązywania połączenia z platformą Kafka. |
kafka.group.id | Identyfikator grupy odbiorców platformy Kafka. | nie ustawiono | [Opcjonalnie] Identyfikator grupy do użycia podczas odczytywania z platformy Kafka. Użyj tego z ostrożnością. Domyślnie każde zapytanie generuje unikatowy identyfikator grupy do odczytywania danych. Gwarantuje to, że każde zapytanie ma własną grupę odbiorców, która nie ma wpływu na żadnego innego użytkownika, i dlatego może odczytywać wszystkie partycje subskrybowanych tematów. W niektórych scenariuszach (na przykład autoryzacja oparta na grupach platformy Kafka) możesz użyć określonych autoryzowanych identyfikatorów grup do odczytywania danych. Opcjonalnie można ustawić identyfikator grupy. Jednak zrób to z skrajną ostrożnością, ponieważ może to spowodować nieoczekiwane zachowanie. — Współbieżnie uruchamiane zapytania (zarówno wsadowe, jak i przesyłane strumieniowo) z tym samym identyfikatorem grupy prawdopodobnie zakłócają każdą kwerendę w celu odczytu tylko części danych. — Może to również wystąpić, gdy zapytania są uruchamiane/ponownie uruchamiane w krótkim odstępie czasu. Aby zminimalizować takie problemy, ustaw konfigurację session.timeout.ms klienta platformy Kafka na bardzo małą. |
startOffsets | najwcześniejsza , najnowsza | latest | [Opcjonalnie] Punkt początkowy, gdy zapytanie jest uruchamiane, "najwcześniejsze", czyli od najwcześniejszych przesunięć, lub ciąg json określający przesunięcie początkowe dla każdej części tematu. W formacie json -2 jako przesunięcie może służyć do odwoływania się do najwcześniejszego, od -1 do najnowszego. Uwaga: w przypadku zapytań wsadowych najnowsza wersja (niejawnie lub przy użyciu -1 w formacie json) jest niedozwolona. W przypadku zapytań przesyłanych strumieniowo ma to zastosowanie tylko wtedy, gdy zostanie uruchomione nowe zapytanie, a wznowienie zawsze będzie pobierane z miejsca, w którym zapytanie zostało przerwane. Nowo odnalezione partycje podczas zapytania będą uruchamiane najwcześniej. |
Aby uzyskać informacje o innych opcjonalnych konfiguracjach, zobacz Przewodnik integracji ze strukturą platformy Kafka.
Schemat rekordów platformy Kafka
Schemat rekordów platformy Kafka to:
Kolumna | Type |
---|---|
key | dane binarne |
wartość | dane binarne |
topic | string |
partycji | int |
offset | długi |
timestamp | długi |
timestampType | int |
Obiekty key
i value
są zawsze deserializowane jako tablice bajtów za pomocą .ByteArrayDeserializer
Użyj operacji ramki danych (takich jak cast("string")
) w celu jawnego deserializacji kluczy i wartości.
Zapisywanie danych na platformie Kafka
Poniżej przedstawiono przykład przesyłania strumieniowego zapisu na platformie Kafka:
(df
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
)
Usługa Azure Databricks obsługuje również semantyka zapisu wsadowego na ujściach danych platformy Kafka, jak pokazano w poniższym przykładzie:
(df
.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
)
Konfigurowanie modułu zapisywania przesyłania strumieniowego ze strukturą platformy Kafka
Ważne
Środowisko Databricks Runtime 13.3 LTS i nowsze zawierają nowszą wersję kafka-clients
biblioteki, która domyślnie umożliwia zapisywanie idempotentne. Jeśli ujście platformy Kafka używa wersji 2.8.0 lub nowszej ze skonfigurowanymi listami ACL, ale bez IDEMPOTENT_WRITE
włączenia tej opcji zapis kończy się niepowodzeniem z komunikatem org.apache.kafka.common.KafkaException:
Cannot execute transactional method because we are in an error state
o błędzie .
Rozwiąż ten błąd, uaktualniając do platformy Kafka w wersji 2.8.0 lub nowszej lub przez ustawienie .option(“kafka.enable.idempotence”, “false”)
podczas konfigurowania składnika zapisywania przesyłania strumieniowego ze strukturą.
Schemat dostarczony do elementu DataStreamWriter współdziała z ujściem platformy Kafka. Możesz użyć następujących pól:
Nazwa kolumny | Wymagane lub opcjonalne | Typ |
---|---|---|
key |
optional | STRING lub BINARY |
value |
wymagane | STRING lub BINARY |
headers |
optional | ARRAY |
topic |
opcjonalne (ignorowane, jeśli topic jest ustawiona jako opcja zapisywania) |
STRING |
partition |
optional | INT |
Poniżej przedstawiono typowe opcje ustawione podczas zapisywania na platformie Kafka:
Opcja | Wartość | Wartość domyślna | opis |
---|---|---|---|
kafka.boostrap.servers |
Rozdzielona przecinkami lista <host:port> |
Brak | [Wymagane] Konfiguracja platformy Kafka bootstrap.servers . |
topic |
STRING |
nie ustawiono | [Opcjonalnie] Ustawia temat dla wszystkich wierszy do zapisania. Ta opcja zastępuje dowolną kolumnę tematu, która istnieje w danych. |
includeHeaders |
BOOLEAN |
false |
[Opcjonalnie] Określa, czy w wierszu mają być uwzględniane nagłówki platformy Kafka. |
Aby uzyskać informacje o innych opcjonalnych konfiguracjach, zobacz Przewodnik integracji ze strukturą platformy Kafka.
Pobieranie metryk platformy Kafka
Możesz uzyskać średnią, minimalną i maksymalną liczbę przesunięć, które zapytanie przesyłane strumieniowo znajduje się za najnowszym dostępnym przesunięciem wśród wszystkich subskrybowanych tematów z avgOffsetsBehindLatest
metrykami , maxOffsetsBehindLatest
i minOffsetsBehindLatest
. Zobacz Interaktywne odczytywanie metryk.
Uwaga
Dostępne w środowisku Databricks Runtime 9.1 lub nowszym.
Uzyskaj szacowaną łączną liczbę bajtów, z których proces zapytania nie korzysta z subskrybowanych tematów, sprawdzając wartość estimatedTotalBytesBehindLatest
. To oszacowanie jest oparte na partiach, które zostały przetworzone w ciągu ostatnich 300 sekund. Przedział czasu, na podstawie którego jest szacowany, można zmienić, ustawiając opcję bytesEstimateWindowLength
na inną wartość. Aby na przykład ustawić go na 10 minut:
df = (spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)
Jeśli używasz strumienia w notesie, możesz zobaczyć te metryki na karcie Nieprzetworzone dane na pulpicie nawigacyjnym postępu zapytania przesyłania strumieniowego:
{
"sources" : [ {
"description" : "KafkaV2[Subscribe[topic]]",
"metrics" : {
"avgOffsetsBehindLatest" : "4.0",
"maxOffsetsBehindLatest" : "4",
"minOffsetsBehindLatest" : "4",
"estimatedTotalBytesBehindLatest" : "80.0"
},
} ]
}
Łączenie usługi Azure Databricks z platformą Kafka przy użyciu protokołu SSL
Aby włączyć połączenia SSL z platformą Kafka, postępuj zgodnie z instrukcjami w dokumentacji Platformy Confluent Szyfrowanie i uwierzytelnianie przy użyciu protokołu SSL. Konfiguracje opisane w tym miejscu można podać z prefiksem kafka.
, jako opcje. Na przykład należy określić lokalizację magazynu zaufania we właściwości kafka.ssl.truststore.location
.
Usługa Databricks zaleca:
- Przechowywanie certyfikatów w magazynie obiektów w chmurze. Dostęp do certyfikatów można ograniczyć tylko do klastrów, które mogą uzyskiwać dostęp do platformy Kafka. Zobacz Zarządzanie danymi za pomocą wykazu aparatu Unity.
- Przechowuj hasła certyfikatu jako wpisy tajne w zakresie wpisów tajnych.
W poniższym przykładzie użyto lokalizacji magazynu obiektów i wpisów tajnych usługi Databricks w celu włączenia połączenia SSL:
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", ...)
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)
Łączenie platformy Kafka w usłudze HDInsight z usługą Azure Databricks
Utwórz klaster platformy Kafka usługi HDInsight.
Aby uzyskać instrukcje, zobacz Connect to Kafka on HDInsight through an Azure Virtual Network (Nawiązywanie połączenia z platformą Kafka w usłudze HDInsight za pośrednictwem sieci wirtualnej platformy Azure).
Skonfiguruj brokerów platformy Kafka, aby anonsować poprawny adres.
Postępuj zgodnie z instrukcjami w temacie Konfigurowanie platformy Kafka pod kątem reklam IP. Jeśli samodzielnie zarządzasz platformą Kafka w usłudze Azure Virtual Machines, upewnij się, że
advertised.listeners
konfiguracja brokerów jest ustawiona na wewnętrzny adres IP hostów.Utwórz klaster usługi Azure Databricks.
Zaimów klaster Platformy Kafka do klastra usługi Azure Databricks.
Postępuj zgodnie z instrukcjami w temacie Równorzędne sieci wirtualne.
Uwierzytelnianie jednostki usługi przy użyciu identyfikatora Entra firmy Microsoft i usługi Azure Event Hubs
Usługa Azure Databricks obsługuje uwierzytelnianie zadań platformy Spark za pomocą usług Event Hubs. To uwierzytelnianie odbywa się za pośrednictwem protokołu OAuth z identyfikatorem Entra firmy Microsoft.
Usługa Azure Databricks obsługuje uwierzytelnianie identyfikatora Entra firmy Microsoft z identyfikatorem klienta i wpisem tajnym w następujących środowiskach obliczeniowych:
- Środowisko Databricks Runtime 12.2 LTS lub nowsze w środowisku obliczeniowym skonfigurowanym z trybem dostępu pojedynczego użytkownika.
- Środowisko Databricks Runtime 14.3 LTS lub nowsze w środowisku obliczeniowym skonfigurowanym z trybem dostępu współdzielonego.
- Potoki tabel na żywo delty skonfigurowane bez wykazu aparatu Unity.
Usługa Azure Databricks nie obsługuje uwierzytelniania identyfikatora Entra firmy Microsoft z certyfikatem w żadnym środowisku obliczeniowym ani potoków tabel delta live tables skonfigurowanych za pomocą wykazu aparatu Unity.
To uwierzytelnianie nie działa w przypadku udostępnionych klastrów ani tabel różnicowych w wykazie aparatu Unity.
Konfigurowanie łącznika platformy Kafka ze strukturą przesyłania strumieniowego
Aby przeprowadzić uwierzytelnianie przy użyciu identyfikatora Entra firmy Microsoft, potrzebne są następujące wartości:
Identyfikator dzierżawy. Tę pozycję można znaleźć na karcie usługi Microsoft Entra ID .
ClientID (znany również jako identyfikator aplikacji).
Wpis tajny klienta. Po utworzeniu tej funkcji należy dodać go jako wpis tajny do obszaru roboczego usługi Databricks. Aby dodać ten wpis tajny, zobacz Zarządzanie wpisami tajnymi.
Temat usługi EventHubs. Listę tematów można znaleźć w sekcji Event Hubs w sekcji Jednostki na określonej stronie Przestrzeni nazw usługi Event Hubs. Aby pracować z wieloma tematami, możesz ustawić rolę zarządzanie dostępem i tożsamościami na poziomie usługi Event Hubs.
Serwer usługi EventHubs. Możesz to znaleźć na stronie przeglądu określonej przestrzeni nazw usługi Event Hubs:
Ponadto, aby używać identyfikatora Entra, musimy poinformować platformę Kafka, aby korzystała z mechanizmu SASL OAuth (SASL jest protokołem ogólnym, a protokół OAuth jest typem "mechanizmu" SASL):
kafka.security.protocol
powinna byćSASL_SSL
kafka.sasl.mechanism
powinna byćOAUTHBEARER
kafka.sasl.login.callback.handler.class
powinna być w pełni kwalifikowaną nazwą klasy Java z wartościąkafkashaded
dla procedury obsługi wywołania zwrotnego logowania zacienionej klasy kafka. Zobacz poniższy przykład, aby uzyskać dokładną klasę.
Przykład
Następnie przyjrzyjmy się uruchomionego przykładowi:
Python
# This is the only section you need to modify for auth purposes!
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")
event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------
sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'
kafka_options = {
# Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers": f"{event_hubs_server}:9093",
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,
# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}
df = spark.readStream.format("kafka").options(**kafka_options)
display(df)
Scala
// This is the only section you need to modify for auth purposes!
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")
val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------
val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""
val kafkaOptions = Map(
// Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers" -> s"$eventHubsServer:9093",
"kafka.sasl.jaas.config" -> saslConfig,
"kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
"subscribe" -> eventHubsTopic,
// You should not need to modify these
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)
val scalaDF = spark.readStream
.format("kafka")
.options(kafkaOptions)
.load()
display(scalaDF)
Obsługa potencjalnych błędów
Opcje przesyłania strumieniowego nie są obsługiwane.
Jeśli spróbujesz użyć tego mechanizmu uwierzytelniania w potoku Tabele na żywo delty skonfigurowanego z wykazem aparatu Unity, może zostać wyświetlony następujący błąd:
Aby rozwiązać ten błąd, użyj obsługiwanej konfiguracji obliczeniowej. Zobacz Uwierzytelnianie jednostki usługi za pomocą identyfikatora Entra firmy Microsoft i usługi Azure Event Hubs.
Nie można utworzyć nowego
KafkaAdminClient
elementu .Jest to błąd wewnętrzny zgłaszany przez platformę Kafka, jeśli którakolwiek z następujących opcji uwierzytelniania jest niepoprawna:
- Identyfikator klienta (znany również jako identyfikator aplikacji)
- Identyfikator dzierżawy
- Serwer Usługi EventHubs
Aby rozwiązać ten problem, sprawdź, czy wartości są poprawne dla tych opcji.
Ponadto ten błąd może zostać wyświetlony, jeśli zmodyfikujesz opcje konfiguracji podane domyślnie w przykładzie (które zostały poproszone o niezmodyfikowanie), takie jak
kafka.security.protocol
.Nie są zwracane żadne rekordy
Jeśli próbujesz wyświetlić lub przetworzyć ramkę danych, ale nie otrzymujesz wyników, w interfejsie użytkownika zobaczysz następujące informacje.
Ten komunikat oznacza, że uwierzytelnianie zakończyło się pomyślnie, ale usługa EventHubs nie zwróciła żadnych danych. Niektóre możliwe (choć w żaden sposób wyczerpujące) przyczyny są następujące:
- Określono niewłaściwy temat usługi EventHubs .
- Domyślną opcją
startingOffsets
konfiguracji platformy Kafka jestlatest
, a obecnie nie otrzymujesz żadnych danych za pośrednictwem tematu. Możesz rozpocząćstartingOffsetstoearliest
odczytywanie danych, zaczynając od najwcześniejszych przesunięć platformy Kafka.