Dela via


Dataströmbearbetning med Apache Kafka och Azure Databricks

I den här artikeln beskrivs hur du kan använda Apache Kafka som källa eller mottagare när du kör arbetsbelastningar för strukturerad direktuppspelning på Azure Databricks.

Mer information om Kafka finns i Kafka-dokumentationen.

Läsa data från Kafka

Följande är ett exempel på en strömningsläsning från Kafka:

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

Azure Databricks stöder även batchläsningssemantik för Kafka-datakällor, som du ser i följande exempel:

df = (spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
)

För inkrementell batchinläsning rekommenderar Databricks att du använder Kafka med Trigger.AvailableNow. Se Konfigurera inkrementell batchbearbetning.

I Databricks Runtime 13.3 LTS och senare tillhandahåller Azure Databricks en SQL-funktion för att läsa Kafka-data. Strömning med SQL stöds endast i Delta Live Tables eller med strömning tables i Databricks SQL. Se read_kafka tablevärdefunktion.

Konfigurera Kafka Structured Streaming-läsare

Azure Databricks tillhandahåller nyckelordet kafka som ett dataformat för att konfigurera connections till Kafka 0.10+.

Följande är de vanligaste konfigurationerna för Kafka:

Det finns flera sätt att ange vilka ämnen som ska prenumerera på. Du bör bara ange en av dessa parameters:

Alternativ Värde beskrivning
prenumerera Ett kommaavgränsat list med ämnen. Ämnet list att prenumerera på.
subscribePattern Java regex-sträng. Det mönster som används för att prenumerera på ämnen.
tilldela JSON-sträng {"topicA":[0,1],"topic":[2,4]}. Specifika topicPartitioner att använda.

Andra viktiga konfigurationer:

Alternativ Värde Standardvärde beskrivning
kafka.bootstrap.servers Kommaavgränsad list värd:port. empty [Krävs] Kafka-konfigurationen bootstrap.servers . Om du upptäcker att det inte finns några data från Kafka kontrollerar du koordinatoradressen list först. Om koordinatoradressen list är felaktig kanske det inte finns några fel. Detta beror på att Kafka-klienten förutsätter att koordinatorerna blir tillgängliga så småningom och om nätverksfel försöker igen för alltid.
failOnDataLoss true eller false. true [Valfritt] Om frågan ska misslyckas när det är möjligt att data har gått förlorade. Frågor kan permanent misslyckas med att läsa data från Kafka på grund av många scenarier, till exempel borttagna ämnen, ämnestrunkering före bearbetning och så vidare. Vi försöker uppskatta om data eventuellt har gått förlorade eller inte. Ibland kan detta orsaka falska larm. Set det här alternativet till false om det inte fungerar som önskat, eller om du vill att förfrågan ska fortsätta behandlingen trots dataförlust.
minPartitions Heltal >= 0, 0 = inaktiverat. 0 (inaktiverad) [Valfritt] Minsta antal partitioner som ska läsas från Kafka. Du kan konfigurera Spark att använda ett godtyckligt minimum av partitioner för att läsa från Kafka med hjälp minPartitions av alternativet . Normalt har Spark en 1-1-mappning av Kafka topicPartitions till Spark-partitioner som förbrukar från Kafka. Om du set alternativet minPartitions till ett värde som är större än dina Kafka topicPartitions kan Spark dela upp stora Kafka-partitioner till mindre delar. Det här alternativet kan vara set vid tider med hög belastning, datasnedvridning och när strömmen ligger efter för att öka bearbetningshastigheten. Det kostar att initiera Kafka-konsumenter vid varje utlösare, vilket kan påverka prestanda om du använder SSL när du ansluter till Kafka.
kafka.group.id Ett Kafka-konsumentgrupps-ID. inte set [Valfritt] Grupp-ID som ska användas vid läsning från Kafka. Använd detta med försiktighet. Som standard genererar varje fråga ett unikt grupp-ID för att läsa data. Detta säkerställer att varje fråga har en egen konsumentgrupp som inte utsätts för interferens från någon annan konsument och därför kan läsa alla partitioner i sina prenumerationsavsnitt. I vissa scenarier (till exempel Kafka-gruppbaserad auktorisering) kanske du vill använda specifika auktoriserade grupp-ID:er för att läsa data. Du kan också set grupp-ID:t. Men gör detta med extrem försiktighet eftersom det kan orsaka oväntat beteende.

– Frågor som körs samtidigt (både batch och direktuppspelning) med samma grupp-ID stör sannolikt varandra, vilket gör att varje fråga endast läser en del av data.
– Detta kan också inträffa när frågor startas/startas om i snabb följd. För att minimera sådana problem set Kafka-konsumentkonfigurationen session.timeout.ms vara mycket liten.
startingOffsets tidigaste , senaste senaste [Valfritt] Startpunkten när en fråga startas, antingen "tidigast" som är från de tidigaste förskjutningarna, eller en json-sträng som anger en start-offset för varje TopicPartition. I json kan -2 som en offset användas för att referera till tidigaste, -1 till senaste. Obs! För batchfrågor tillåts inte den senaste (implicit eller med hjälp av -1 i json). För strömmande frågor gäller detta bara när en ny fråga startas, och återupptagningen sker alltid från där where frågan avslutades. Nyligen identifierade partitioner under en fråga startar tidigast.

Se Integrationsguide för strukturerad direktuppspelning kafka för andra valfria konfigurationer.

Schema för Kafka-poster

schema av Kafka-loggarna är:

Column Typ
key binary
värde binary
Avsnitt sträng
partition heltal
offset lång
timestamp lång
timestampType heltal

key Och value är alltid deserialiserade som bytematriser med ByteArrayDeserializer. Använd DataFrame-åtgärder (till exempel cast("string")) för att explicit deserialisera nycklarna och values.

Skriva data till Kafka

Följande är ett exempel på en direktuppspelningsskrivning till Kafka:

(df
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()
)

Azure Databricks stöder även batchskrivningssemantik till Kafka-datamottagare, som du ser i följande exempel:

(df
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()
)

Konfigurera Kafka Structured Streaming-skrivaren

Viktigt!

Databricks Runtime 13.3 LTS och senare innehåller en nyare version av kafka-clients biblioteket som aktiverar idempotent-skrivningar som standard. Om en Kafka-mottagare använder version 2.8.0 eller senare med ACL:er konfigurerade, men utan IDEMPOTENT_WRITE aktiverad, misslyckas skrivning med felmeddelandet org.apache.kafka.common.KafkaException:Cannot execute transactional method because we are in an error state.

Lös det här felet genom att uppgradera till Kafka version 2.8.0 eller senare, eller genom att ange .option(“kafka.enable.idempotence”, “false”) när du konfigurerar din structured streaming-skrivare.

Den schema som tillhandahålls till DataStreamWriter interagerar med Kafka-sink. Du kan använda följande fält:

Column namn Obligatorisk eller valfri Typ
key valfri STRING eller BINARY
value required STRING eller BINARY
headers valfri ARRAY
topic valfritt (ignoreras om topic är set som skrivalternativ) STRING
partition valfri INT

Följande är vanliga alternativ set när du skriver till Kafka:

Alternativ Värde Standardvärde beskrivning
kafka.boostrap.servers Ett kommaavgränsat list av <host:port> inget [Krävs] Kafka-konfigurationen bootstrap.servers .
topic STRING inte set [Valfritt] Anger ämnet för alla rader som ska skrivas. Det här alternativet åsidosätter alla ämnen column som finns i data.
includeHeaders BOOLEAN false [Valfritt] Om Kafka-rubrikerna ska inkluderas på raden.

Se Integrationsguide för strukturerad direktuppspelning kafka för andra valfria konfigurationer.

Hämta Kafka-mått

Du kan get medelvärdet, min och max för antalet förskjutningar som strömningsfrågan ligger bakom den senaste tillgängliga offset bland alla prenumerationsämnen med måtten avgOffsetsBehindLatest, maxOffsetsBehindLatestoch minOffsetsBehindLatest. Se Läsa mått interaktivt.

Kommentar

Tillgänglig i Databricks Runtime 9.1 och senare.

Get det uppskattade totala antalet byte som frågeprocessen inte har förbrukat från de prenumererade ämnena genom att undersöka värdet för estimatedTotalBytesBehindLatest. Den här uppskattningen baseras på de batchar som bearbetats under de senaste 300 sekunderna. Den tidsram som uppskattningen baseras på kan ändras genom att alternativet bytesEstimateWindowLength anges till ett annat värde. Om du till exempel vill set den till 10 minuter:

df = (spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)

Om du kör dataströmmen i en notebook-fil kan du se dessa mått under fliken Rådata på instrumentpanelen för strömningsfrågans förlopp:

{
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[topic]]",
    "metrics" : {
      "avgOffsetsBehindLatest" : "4.0",
      "maxOffsetsBehindLatest" : "4",
      "minOffsetsBehindLatest" : "4",
      "estimatedTotalBytesBehindLatest" : "80.0"
    },
  } ]
}

Använda SSL för att ansluta Azure Databricks till Kafka

Om du vill aktivera SSL-connections till Kafka följer du anvisningarna i Confluent-dokumentationen Kryptering och autentisering med SSL. Du kan ange de konfigurationer som beskrivs där, prefixet med kafka., som alternativ. Du kan till exempel ange platsen för förtroendearkivet i egenskapen kafka.ssl.truststore.location.

Databricks rekommenderar att du:

I följande exempel används objektlagringsplatser och Databricks-hemligheter för att aktivera en SSL-anslutning:

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", ...)
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.ssl.truststore.location", <truststore-location>)
  .option("kafka.ssl.keystore.location", <keystore-location>)
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)

Ansluta Kafka på HDInsight till Azure Databricks

  1. Skapa ett HDInsight Kafka-kluster.

    Mer information finns i Ansluta till Kafka i HDInsight via ett virtuellt Azure-nätverk .

  2. Konfigurera Kafka-koordinatorerna så att de annonserar rätt adress.

    Följ anvisningarna i Konfigurera Kafka för IP-reklam. Om du hanterar Kafka själv på Azure Virtual Machines kontrollerar du att advertised.listeners konfigurationen av asynkrona meddelandeköer set till värdarnas interna IP-adress.

  3. Skapa ett Azure Databricks-kluster.

  4. Peer-koppla Kafka-klustret till Azure Databricks-klustret.

    Följ anvisningarna i peer-virtuella nätverk.

Autentisering med tjänstens huvudnamn med Microsoft Entra-ID och Azure Event Hubs

Azure Databricks stöder autentisering av Spark-jobb med Event Hubs-tjänster. Den här autentiseringen görs via OAuth med Microsoft Entra-ID.

AAD-autentiseringsdiagram

Azure Databricks stöder Microsoft Entra-ID-autentisering med ett klient-ID och en hemlighet i följande beräkningsmiljöer:

  • Databricks Runtime 12.2 LTS och senare vid beräkning som konfigurerats med åtkomstläge för en användare.
  • Databricks Runtime 14.3 LTS och senare vid beräkning som konfigurerats med läget för delad åtkomst.
  • Delta Live Tables-pipelines som konfigurerats utan Unity Catalog.

Azure Databricks stöder inte Microsoft Entra-ID-autentisering med ett certifikat i någon beräkningsmiljö eller i Delta Live Tables pipelines som konfigurerats med Unity Catalog.

Den här autentiseringen fungerar inte på delade kluster eller på Unity Catalog Delta Live Tables.

Konfigurera Kafka-anslutningsappen för strukturerad direktuppspelning

För att kunna utföra autentisering med Microsoft Entra-ID behöver du följande values:

  • Ett klientorganisations-ID. Du hittar detta på fliken Microsoft Entra ID-tjänster .

  • Ett clientID (även kallat program-ID).

  • En klienthemlighet. När du har det här bör du lägga till det som en hemlighet i din Databricks-arbetsyta. Information om hur du lägger till den här hemligheten finns i Hemlighetshantering.

  • Ett EventHubs-ämne. Du kan hitta ett antal list ämnen i Event Hubs-sektionen under avsnittet Entiteter på en specifik Event Hubs-namnområde-sida. Om du vill arbeta med flera ämnen kan du set IAM-rollen på Event Hubs-nivå.

  • En EventHubs-server. Du hittar detta på översiktssidan för ditt specifika Event Hubs-namnområde:

    Event Hubs namnrymd

För att kunna använda Entra-ID måste vi dessutom be Kafka att använda OAuth SASL-mekanismen (SASL är ett generiskt protokoll och OAuth är en typ av SASL-mekanism):

  • kafka.security.protocol bör vara SASL_SSL
  • kafka.sasl.mechanism bör vara OAUTHBEARER
  • kafka.sasl.login.callback.handler.class bör vara ett fullständigt kvalificerat namn på Java-klassen med värdet kafkashaded för till hanteraren för återanrop för inloggning för vår skuggade Kafka-klass. Se följande exempel för den exakta klassen.

Exempel

Nu ska vi titta på ett exempel som körs:

Python

# This is the only section you need to modify for auth purposes!
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")

event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------

sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'

kafka_options = {
# Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers": f"{event_hubs_server}:9093",
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,

# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}

df = spark.readStream.format("kafka").options(**kafka_options)

display(df)

Scala

// This is the only section you need to modify for auth purposes!
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")

val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------

val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""

val kafkaOptions = Map(
// Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers" -> s"$eventHubsServer:9093",
"kafka.sasl.jaas.config" -> saslConfig,
"kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
"subscribe" -> eventHubsTopic,

// You should not need to modify these
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)

val scalaDF = spark.readStream
  .format("kafka")
  .options(kafkaOptions)
  .load()

display(scalaDF)

Hantera potentiella fel

  • Strömningsalternativ stöds inte.

    Om du försöker använda den här autentiseringsmekanismen i en Delta Live-Tables pipeline som konfigurerats med Unity Catalog kan du få följande fel:

    Strömningsfel som inte stöds

    Lös det här felet genom att använda en beräkningskonfiguration som stöds. Se Autentisering med tjänstens huvudnamn med Microsoft Entra-ID och Azure Event Hubs.

  • Det gick inte att skapa en ny KafkaAdminClient.

    Det här är ett internt fel som Kafka genererar om något av följande autentiseringsalternativ är felaktigt:

    • Klient-ID (även kallat program-ID)
    • Klientorganisations-ID
    • EventHubs-server

    Du kan lösa felet genom att kontrollera att values är korrekta för de här alternativen.

    Dessutom kan det här felet visas om du ändrar konfigurationsalternativen som anges som standard i exemplet (som du uppmanas att inte ändra), till exempel kafka.security.protocol.

  • Det finns inga poster som returneras

    Om du försöker visa eller bearbeta din DataFrame men inte får resultat visas följande i användargränssnittet.

    Inget resultatmeddelande

    Det här meddelandet innebär att autentiseringen lyckades, men EventHubs returnerade inga data. Några möjliga (men inte på något sätt uttömmande) skäl är:

    • Du har angett fel EventHubs-ämne .
    • Standardalternativet kafka-konfiguration för startingOffsets är latest, och du tar för närvarande inte emot några data via ämnet ännu. Du kan setstartingOffsetstoearliest för att börja läsa data från Kafkas tidigaste offsetar.