Udostępnij za pośrednictwem


Przetwarzanie strumieniowe przy użyciu platformy Apache Kafka i usługi Azure Databricks

W tym artykule opisano, jak można używać platformy Apache Kafka jako źródła lub ujścia podczas uruchamiania obciążeń przesyłania strumieniowego ze strukturą w usłudze Azure Databricks.

Aby uzyskać więcej informacji na temat platformy Kafka, zobacz dokumentację platformy Kafka.

Odczytywanie danych z platformy Kafka

Poniżej przedstawiono przykład przesyłania strumieniowego z platformy Kafka:

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

Usługa Azure Databricks obsługuje również semantyka odczytu wsadowego dla źródeł danych platformy Kafka, jak pokazano w poniższym przykładzie:

df = (spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
)

W przypadku ładowania przyrostowego wsadowego usługa Databricks zaleca używanie platformy Kafka z usługą Trigger.AvailableNow. Zobacz Konfigurowanie przyrostowego przetwarzania wsadowego.

W środowisku Databricks Runtime 13.3 LTS i nowszym usługa Azure Databricks udostępnia funkcję SQL do odczytywania danych platformy Kafka. Przesyłanie strumieniowe za pomocą języka SQL jest obsługiwane tylko w usłudze Delta Live Tables lub z tables przesyłania strumieniowego w usłudze Databricks SQL. Zobacz read_kafka table-wartościową funkcję.

Konfigurowanie czytnika przesyłania strumieniowego ze strukturą platformy Kafka

Usługa Azure Databricks udostępnia słowo kluczowe kafka jako format danych do konfigurowania connections na platformie Kafka 0.10 lub nowszej.

Poniżej przedstawiono najbardziej typowe konfiguracje platformy Kafka:

Istnieje wiele sposobów określania tematów do zasubskrybowania. Należy podać tylko jeden z tych parameters:

Opcja Wartość Opis
subskrybowanie Lista tematów rozdzielona przecinkami, list. Temat list do subskrypcji.
subscribePattern Ciąg wyrażenia regularnego języka Java. Wzorzec używany do subskrybowania tematów.
przypisywanie Ciąg {"topicA":[0,1],"topic":[2,4]}JSON . Określone tematyPartition do korzystania.

Inne istotne konfiguracje:

Opcja Wartość Wartość domyślna Opis
kafka.bootstrap.servers Rozdzielane przecinkami list host:port. empty [Wymagane] Konfiguracja platformy Kafka bootstrap.servers . Jeśli okaże się, że nie ma danych z platformy Kafka, najpierw sprawdź adres brokera list. Jeśli adres brokera list jest niepoprawny, może nie wystąpić żaden błąd. Dzieje się tak, ponieważ klient platformy Kafka zakłada, że brokerzy staną się dostępne w końcu i w przypadku błędów sieci ponawiania próby na zawsze.
failOnDataLoss Usługa true lub false. true [Opcjonalnie] Czy zapytanie nie powiodło się, jeśli jest możliwe, że dane zostały utracone. Zapytania mogą trwale nie odczytać danych z platformy Kafka ze względu na wiele scenariuszy, takich jak usunięte tematy, obcięcie tematu przed przetworzeniem itd. Staramy się oszacować konserwatywnie, czy dane mogły być utracone, czy nie. Czasami może to spowodować fałszywe alarmy. Set tę opcję na false, jeśli nie działa zgodnie z planem, lub jeśli chcesz, aby zapytanie kontynuowało przetwarzanie pomimo utraty danych.
minPartitions Liczba całkowita >= 0, 0 = wyłączona. 0 (wyłączone) [Opcjonalnie] Minimalna liczba partycji do odczytu z platformy Kafka. Platformę Spark można skonfigurować tak, aby używała dowolnej minimalnej liczby partycji do odczytu z platformy Kafka przy użyciu minPartitions opcji . Zwykle platforma Spark ma mapowanie 1–1 tematów platformy KafkaPartitions na partycje platformy Spark zużywane z platformy Kafka. Jeśli set opcji minPartitions wartości większej niż temat Platformy KafkaPartitions, platforma Spark podzieli duże partycje platformy Kafka na mniejsze elementy. Ta opcja może być set w okresach szczytowych obciążeń, nierównomiernego rozkładu danych i gdy Twój strumień nie nadąża, aby zwiększyć szybkość przetwarzania. Wiąże się to z kosztem inicjowania użytkowników platformy Kafka w każdym wyzwalaczu, co może mieć wpływ na wydajność w przypadku używania protokołu SSL podczas nawiązywania połączenia z platformą Kafka.
kafka.group.id Identyfikator grupy odbiorców platformy Kafka. nie set [Opcjonalnie] Identyfikator grupy do użycia podczas odczytywania z platformy Kafka. Użyj tego z ostrożnością. Domyślnie każde zapytanie generuje unikatowy identyfikator grupy do odczytywania danych. Gwarantuje to, że każde zapytanie ma własną grupę odbiorców, która nie ma wpływu na żadnego innego użytkownika, i dlatego może odczytywać wszystkie partycje subskrybowanych tematów. W niektórych scenariuszach (na przykład autoryzacja oparta na grupach platformy Kafka) możesz użyć określonych autoryzowanych identyfikatorów grup do odczytywania danych. Możesz opcjonalnie wykonać operację set na identyfikatorze grupy. Jednak zrób to z skrajną ostrożnością, ponieważ może to spowodować nieoczekiwane zachowanie.

— Współbieżnie uruchamiane zapytania (zarówno wsadowe, jak i przesyłane strumieniowo) z tym samym identyfikatorem grupy prawdopodobnie zakłócają każdą kwerendę w celu odczytu tylko części danych.
— Może to również wystąpić, gdy zapytania są uruchamiane/ponownie uruchamiane w krótkim odstępie czasu. Aby zminimalizować takie problemy, set konfiguracja klienta platformy Kafka session.timeout.ms być bardzo mała.
startOffsets najwcześniejsza , najnowsza latest [Opcjonalnie] Punkt startowy, gdy zapytanie jest uruchamiane: „najwcześniejsze”, czyli od najwcześniejszych offsetów, lub ciąg JSON określający początkowy offset dla każdego TopicPartition. W formacie JSON -2 jako offset może być używane do wskazywania najwcześniejszego, a -1 do najnowszego. Uwaga: w przypadku zapytań wsadowych najnowsza wersja (niejawnie lub przy użyciu -1 w formacie json) jest niedozwolona. W przypadku zapytań przesyłanych strumieniowo ma to zastosowanie tylko wtedy, gdy uruchomione zostanie nowe zapytanie, a wznowienie zawsze rozpocznie się od miejsca, w którym zapytanie zostało przerwane w where. Nowo odnalezione partycje podczas zapytania będą uruchamiane najwcześniej.

Aby uzyskać informacje o innych opcjonalnych konfiguracjach, zobacz Przewodnik integracji ze strukturą platformy Kafka.

Schema dla rekordów platformy Kafka

schema rekordów platformy Kafka to:

Column Type
key dane binarne
wartość dane binarne
topic string
partition int
offset długi
timestamp długi
timestampType int

Obiekty key i value są zawsze deserializowane jako tablice bajtów za pomocą .ByteArrayDeserializer Użyj operacji DataFrame (takich jak cast("string")), aby jawnie zdeserializować klucze i values.

Zapisywanie danych na platformie Kafka

Poniżej przedstawiono przykład przesyłania strumieniowego zapisu na platformie Kafka:

(df
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()
)

Usługa Azure Databricks obsługuje również semantyka zapisu wsadowego na ujściach danych platformy Kafka, jak pokazano w poniższym przykładzie:

(df
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()
)

Konfigurowanie składnika zapisu przesyłania strumieniowego Apache Kafka

Ważne

Środowisko Databricks Runtime 13.3 LTS i nowsze zawierają nowszą wersję kafka-clients biblioteki, która domyślnie umożliwia zapisywanie idempotentne. Jeśli ujście platformy Kafka używa wersji 2.8.0 lub nowszej ze skonfigurowanymi listami ACL, ale bez IDEMPOTENT_WRITE włączenia tej opcji zapis kończy się niepowodzeniem z komunikatem org.apache.kafka.common.KafkaException:Cannot execute transactional method because we are in an error stateo błędzie .

Rozwiąż ten błąd, uaktualniając do platformy Kafka w wersji 2.8.0 lub nowszej lub przez ustawienie .option(“kafka.enable.idempotence”, “false”) podczas konfigurowania składnika zapisywania przesyłania strumieniowego ze strukturą.

schema dostarczone do obiektu DataStreamWriter współdziała z syfonem platformy Kafka. Możesz użyć następujących pól:

nazwa Column Wymagane lub opcjonalne Type
key optional STRING lub BINARY
value wymagane STRING lub BINARY
headers optional ARRAY
topic opcjonalne (ignorowane, jeśli topic jest set jako opcja pisarza) STRING
partition optional INT

Poniżej przedstawiono typowe opcje set podczas zapisywania na platformie Kafka:

Opcja Wartość Wartość domyślna Opis
kafka.boostrap.servers Rozdzielane przecinkami list<host:port> Brak [Wymagane] Konfiguracja platformy Kafka bootstrap.servers .
topic STRING nie set [Opcjonalnie] Ustawia temat dla wszystkich wierszy do zapisania. Ta opcja zastępuje jakikolwiek temat column, który istnieje w danych.
includeHeaders BOOLEAN false [Opcjonalnie] Określa, czy w wierszu mają być uwzględniane nagłówki platformy Kafka.

Aby uzyskać informacje o innych opcjonalnych konfiguracjach, zobacz Przewodnik integracji ze strukturą platformy Kafka.

Pobieranie metryk platformy Kafka

Możesz get średnią, minimalną i maksymalną liczbę przesunięć, które zapytanie przesyłane strumieniowo znajduje się za najnowszymi dostępnymi offset wśród wszystkich subskrybowanych tematów z metrykami avgOffsetsBehindLatest, maxOffsetsBehindLatesti minOffsetsBehindLatest. Zobacz Interaktywne odczytywanie metryk.

Uwaga

Dostępne w środowisku Databricks Runtime 9.1 lub nowszym.

Get szacowaną łączną liczbę bajtów, z których proces zapytania nie korzysta z subskrybowanych tematów, sprawdzając wartość estimatedTotalBytesBehindLatest. To oszacowanie jest oparte na partiach, które zostały przetworzone w ciągu ostatnich 300 sekund. Przedział czasu, na podstawie którego jest szacowany, można zmienić, ustawiając opcję bytesEstimateWindowLength na inną wartość. Na przykład, aby wykonać set do 10 minut:

df = (spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)

Jeśli używasz strumienia w notesie, możesz zobaczyć te metryki na karcie Nieprzetworzone dane na pulpicie nawigacyjnym postępu zapytania przesyłania strumieniowego:

{
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[topic]]",
    "metrics" : {
      "avgOffsetsBehindLatest" : "4.0",
      "maxOffsetsBehindLatest" : "4",
      "minOffsetsBehindLatest" : "4",
      "estimatedTotalBytesBehindLatest" : "80.0"
    },
  } ]
}

Łączenie usługi Azure Databricks z platformą Kafka przy użyciu protokołu SSL

Aby włączyć SSL connections w Kafka, postępuj zgodnie z instrukcjami w dokumentacji Confluent Szyfrowanie i uwierzytelnianie z SSL. Konfiguracje opisane w tym miejscu można podać z prefiksem kafka., jako opcje. Na przykład należy określić lokalizację magazynu zaufania we właściwości kafka.ssl.truststore.location.

Usługa Databricks zaleca:

  • Przechowywanie certyfikatów w magazynie obiektów w chmurze. Dostęp do certyfikatów można ograniczyć tylko do klastrów, które mogą uzyskiwać dostęp do platformy Kafka. Zobacz zarządzanie danymi za pomocą Unity Catalog.
  • Przechowuj hasła certyfikatu jako wpisy tajne w zakresie wpisów tajnych.

W poniższym przykładzie użyto lokalizacji magazynu obiektów i wpisów tajnych usługi Databricks w celu włączenia połączenia SSL:

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", ...)
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.ssl.truststore.location", <truststore-location>)
  .option("kafka.ssl.keystore.location", <keystore-location>)
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)

Łączenie platformy Kafka w usłudze HDInsight z usługą Azure Databricks

  1. Utwórz klaster platformy Kafka usługi HDInsight.

    Aby uzyskać instrukcje, zobacz Connect to Kafka on HDInsight through an Azure Virtual Network (Nawiązywanie połączenia z platformą Kafka w usłudze HDInsight za pośrednictwem sieci wirtualnej platformy Azure).

  2. Skonfiguruj brokerów platformy Kafka, aby anonsować poprawny adres.

    Postępuj zgodnie z instrukcjami w temacie Konfigurowanie platformy Kafka pod kątem reklam IP. Jeśli samodzielnie zarządzasz platformą Kafka w usłudze Azure Virtual Machines, upewnij się, że konfiguracja advertised.listeners brokerów jest set do wewnętrznego adresu IP hostów.

  3. Utwórz klaster usługi Azure Databricks.

  4. Zaimów klaster Platformy Kafka do klastra usługi Azure Databricks.

    Postępuj zgodnie z instrukcjami w temacie Równorzędne sieci wirtualne.

Uwierzytelnianie jednostki usługi przy użyciu identyfikatora Entra firmy Microsoft i usługi Azure Event Hubs

Usługa Azure Databricks obsługuje uwierzytelnianie zadań platformy Spark za pomocą usług Event Hubs. To uwierzytelnianie odbywa się za pośrednictwem protokołu OAuth z identyfikatorem Entra firmy Microsoft.

Diagram uwierzytelniania usługi AAD

Usługa Azure Databricks obsługuje uwierzytelnianie identyfikatora Entra firmy Microsoft z identyfikatorem klienta i wpisem tajnym w następujących środowiskach obliczeniowych:

  • Środowisko Databricks Runtime 12.2 LTS lub nowsze w środowisku obliczeniowym skonfigurowanym z trybem dostępu pojedynczego użytkownika.
  • Środowisko Databricks Runtime 14.3 LTS lub nowsze w środowisku obliczeniowym skonfigurowanym z trybem dostępu współdzielonego.
  • Potoki danych Delta Live Tables skonfigurowane bez systemu Unity Catalog.

Usługa Azure Databricks nie obsługuje uwierzytelniania za pomocą Microsoft Entra ID z certyfikatem w żadnym środowisku obliczeniowym ani w potokach Delta Live Tables skonfigurowanych z Unity Catalog.

To uwierzytelnianie nie działa w udostępnionych klastrach ani w usłudze Unity Catalog Delta Live Tables.

Konfigurowanie łącznika platformy Kafka ze strukturą przesyłania strumieniowego

Aby przeprowadzić uwierzytelnianie przy użyciu identyfikatora Entra firmy Microsoft, potrzebne są następujące values:

  • Identyfikator dzierżawy. Tę pozycję można znaleźć na karcie usługi Microsoft Entra ID .

  • ClientID (znany również jako identyfikator aplikacji).

  • Wpis tajny klienta. Po utworzeniu tej funkcji należy dodać go jako wpis tajny do obszaru roboczego usługi Databricks. Aby dodać ten wpis tajny, zobacz Zarządzanie wpisami tajnymi.

  • Temat usługi EventHubs. Tematy list można znaleźć w sekcji Event Hubs pod sekcją Entities na określonej stronie przestrzeni nazw Event Hubs. Aby pracować z wieloma tematami, możesz set rolę IAM na poziomie usługi Event Hubs.

  • Serwer usługi EventHubs. Możesz to znaleźć na stronie przeglądu określonej przestrzeni nazw usługi Event Hubs:

    Przestrzeń nazw usługi Event Hubs

Ponadto, aby używać identyfikatora Entra, musimy poinformować platformę Kafka, aby korzystała z mechanizmu SASL OAuth (SASL jest protokołem ogólnym, a protokół OAuth jest typem "mechanizmu" SASL):

  • kafka.security.protocol powinna być SASL_SSL
  • kafka.sasl.mechanism powinna być OAUTHBEARER
  • kafka.sasl.login.callback.handler.class powinna być w pełni kwalifikowaną nazwą klasy Java z wartością kafkashaded dla procedury obsługi wywołania zwrotnego logowania zacienionej klasy kafka. Zobacz poniższy przykład, aby uzyskać dokładną klasę.

Przykład

Następnie przyjrzyjmy się uruchomionego przykładowi:

Python

# This is the only section you need to modify for auth purposes!
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")

event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------

sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'

kafka_options = {
# Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers": f"{event_hubs_server}:9093",
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,

# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}

df = spark.readStream.format("kafka").options(**kafka_options)

display(df)

Scala

// This is the only section you need to modify for auth purposes!
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")

val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------

val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""

val kafkaOptions = Map(
// Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers" -> s"$eventHubsServer:9093",
"kafka.sasl.jaas.config" -> saslConfig,
"kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
"subscribe" -> eventHubsTopic,

// You should not need to modify these
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)

val scalaDF = spark.readStream
  .format("kafka")
  .options(kafkaOptions)
  .load()

display(scalaDF)

Obsługa potencjalnych błędów

  • Opcje przesyłania strumieniowego nie są obsługiwane.

    Jeśli spróbujesz użyć tego mechanizmu uwierzytelniania w potoku Delta Live Tables skonfigurowanym z użyciem Unity Catalog, może pojawić się następujący błąd:

    Nieobsługiwany błąd przesyłania strumieniowego

    Aby rozwiązać ten błąd, użyj obsługiwanej konfiguracji obliczeniowej. Zobacz Uwierzytelnianie jednostki usługi za pomocą identyfikatora Entra firmy Microsoft i usługi Azure Event Hubs.

  • Nie można utworzyć nowego KafkaAdminClientelementu .

    Jest to błąd wewnętrzny zgłaszany przez platformę Kafka, jeśli którakolwiek z następujących opcji uwierzytelniania jest niepoprawna:

    • Identyfikator klienta (znany również jako identyfikator aplikacji)
    • Identyfikator dzierżawy
    • Serwer Usługi EventHubs

    Aby rozwiązać ten problem, sprawdź, czy values są poprawne dla tych opcji.

    Ponadto ten błąd może zostać wyświetlony, jeśli zmodyfikujesz opcje konfiguracji podane domyślnie w przykładzie (które zostały poproszone o niezmodyfikowanie), takie jak kafka.security.protocol.

  • Nie są zwracane żadne rekordy

    Jeśli próbujesz wyświetlić lub przetworzyć ramkę danych, ale nie otrzymujesz wyników, w interfejsie użytkownika zobaczysz następujące informacje.

    Brak komunikatu o wynikach

    Ten komunikat oznacza, że uwierzytelnianie zakończyło się pomyślnie, ale usługa EventHubs nie zwróciła żadnych danych. Niektóre możliwe (choć w żaden sposób wyczerpujące) przyczyny są następujące:

    • Określono niewłaściwy temat usługi EventHubs .
    • Domyślną opcją startingOffsets konfiguracji platformy Kafka jest latest, a obecnie nie otrzymujesz żadnych danych za pośrednictwem tematu. Możesz setstartingOffsetstoearliest, aby rozpocząć odczytywanie danych od najwcześniejszych dostępnych przesunięć w Kafka.