Streamverwerking met Apache Kafka en Azure Databricks
In dit artikel wordt beschreven hoe u Apache Kafka kunt gebruiken als bron of sink bij het uitvoeren van structured streaming-workloads in Azure Databricks.
Zie de Kafka-documentatie voor meer Kafka.
Gegevens lezen uit Kafka
Hier volgt een voorbeeld voor een streaming-leesbewerking uit Kafka:
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)
Azure Databricks biedt ook ondersteuning voor semantiek voor batchleesbewerkingen voor Kafka-gegevensbronnen, zoals wordt weergegeven in het volgende voorbeeld:
df = (spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
)
Voor het laden van incrementele batches raadt Databricks het gebruik van Kafka aan met Trigger.AvailableNow
. Zie Incrementele batchverwerking configureren.
In Databricks Runtime 13.3 LTS en hoger biedt Azure Databricks een SQL-functie voor het lezen van Kafka-gegevens. Streaming met SQL wordt alleen ondersteund in DLT of met streamingtabellen in Databricks SQL. Zie read_kafka
tabelwaardefunctie.
Kafka Structured Streaming-lezer configureren
Azure Databricks biedt het kafka
trefwoord als gegevensindeling voor het configureren van verbindingen met Kafka 0.10+.
Hier volgen de meest voorkomende configuraties voor Kafka:
Er zijn meerdere manieren waarop u kunt opgeven op welke onderwerpen u zich wilt abonneren. U moet slechts een van deze parameters opgeven:
Optie | Waarde | Beschrijving |
---|---|---|
abonneren | Een door komma's gescheiden lijst met onderwerpen. | De lijst met onderwerpen waarop u zich wilt abonneren. |
abonnerenPatroon | Java regex-tekenreeks. | Het patroon dat wordt gebruikt om u te abonneren op een of meer onderwerpen. |
toewijzen | JSON-tekenreeks {"topicA":[0,1],"topic":[2,4]} . |
Specifieke topicPartitions die moeten worden gebruikt. |
Andere belangrijke configuraties:
Optie | Waarde | Standaardwaarde | Beschrijving |
---|---|---|---|
kafka.bootstrap.servers | Een door komma's gescheiden lijst van host:poort. | leeg | [Vereist] De Kafka-configuratie bootstrap.servers . Als u merkt dat er geen gegevens uit Kafka zijn, controleert u eerst de adreslijst van de broker. Als de adreslijst van de broker onjuist is, zijn er mogelijk geen fouten. Dit komt doordat de Kafka-client ervan uitgaat dat de brokers uiteindelijk beschikbaar komen en in het geval van netwerkfouten voor altijd opnieuw proberen. |
failOnDataLoss |
true of false . |
true |
[Optioneel] Of de query mislukt wanneer het mogelijk is dat de gegevens verloren zijn gegaan. Query's kunnen gegevens uit Kafka permanent niet lezen vanwege veel scenario's, zoals verwijderde onderwerpen, afkapping van onderwerpen voordat ze worden verwerkt, enzovoort. We proberen conservatief te schatten of gegevens mogelijk verloren zijn gegaan of niet. Soms kan dit valse waarschuwingen veroorzaken. Stel deze optie in op false als deze niet werkt zoals verwacht of als u wilt dat de query blijft verwerken ondanks gegevensverlies. |
minPartitions | Integer >= 0, 0 = uitgeschakeld. | 0 (uitgeschakeld) | [Optioneel] Het minimale aantal partities dat uit Kafka gelezen moet worden. U kunt Spark configureren voor het gebruik van een willekeurig minimum aan partities om vanuit Kafka te lezen met behulp van de minPartitions optie. Normaal gesproken heeft Spark een 1-1 toewijzing van Kafka-topicpartities aan Spark-partities die Kafka consumeren. Als u de optie minPartitions instelt op een waarde die groter is dan uw Kafka topicPartitions, worden grote Kafka-partities door Spark verdeeld over kleinere onderdelen. Deze optie kan worden ingesteld tijdens piekbelastingen, scheve verdeling van gegevens en wanneer uw gegevensstroom achterop raakt, om de verwerkingssnelheid te verhogen. Het komt ten koste van het initialiseren van Kafka-consumenten bij elke trigger, wat de prestaties kan beïnvloeden als u SSL gebruikt bij het maken van verbinding met Kafka. |
kafka.group.id | Een Kafka-consumentengroep-id. | niet ingesteld | [Optioneel] Groeps-id die moet worden gebruikt tijdens het lezen uit Kafka. Gebruik dit met voorzichtigheid. Standaard genereert elke query een unieke groeps-id voor het lezen van gegevens. Dit zorgt ervoor dat elke query een eigen consumentengroep heeft die geen interferentie van een andere consument ondervindt en daarom alle partities van de geabonneerde onderwerpen kan lezen. In sommige scenario's (bijvoorbeeld autorisatie op basis van kafka-groepen), kunt u specifieke geautoriseerde groeps-id's gebruiken om gegevens te lezen. U kunt desgewenst de groeps-id instellen. Doe dit echter met extreme voorzichtigheid, omdat dit onverwacht gedrag kan veroorzaken. - Gelijktijdig uitgevoerde query's (zowel batch- als streaming) met dezelfde groeps-id beïnvloeden waarschijnlijk elkaar, waardoor elke query slechts een deel van de gegevens kan lezen. - Dit kan ook gebeuren wanneer query's snel na elkaar worden gestart/opnieuw gestart. Als u dergelijke problemen wilt minimaliseren, stelt u de Kafka-consumer configuratie in op zeer klein. |
startingOffsets | vroegste, meest recente | nieuwste | [Optioneel] Het beginpunt waarop een query wordt gestart, ofwel 'vroegst' die afkomstig is van de vroegste verschuivingen, of een json-tekenreeks die een beginverschil voor elke TopicPartition opgeeft. In de json kan -2 als offset worden gebruikt om te verwijzen naar de vroegste, -1 naar de meest recente. Opmerking: Voor batchquery's is de meest recente (impliciet of met behulp van -1 in json) niet toegestaan. Voor streaming-queries geldt dit alleen wanneer een nieuwe query wordt gestart en het hervatten altijd verdergaat vanaf het punt waar de query was gebleven. Nieuwe gedetecteerde partities tijdens een query beginnen ten vroegste. |
Zie de integratiehandleiding voor Structured Streaming Kafka voor andere optionele configuraties.
Schema voor Kafka-records
Het schema van Kafka-records is:
Kolom | Type |
---|---|
sleutel | binair |
waarde | binair |
onderwerp | tekenreeks |
verdelen | int |
compensatie | lang |
tijdstempel | lang |
tijdaanduidingstype | int |
De key
en de value
worden altijd gedeserialiseerd als bytematrices met de ByteArrayDeserializer
. Gebruik DataFrame-bewerkingen (zoals cast("string")
) om de sleutels en waarden expliciet deserialiseren.
Gegevens schrijven naar Kafka
Hier volgt een voorbeeld van een streaming-schrijfbewerking naar Kafka:
(df
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
)
Azure Databricks ondersteunt ook semantiek voor batch-schrijfbewerkingen naar Kafka-gegevenssinks, zoals wordt weergegeven in het volgende voorbeeld:
(df
.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
)
De kafka Structured Streaming Writer configureren
Belangrijk
Databricks Runtime 13.3 LTS en hoger bevat een nieuwere versie van de kafka-clients
bibliotheek waarmee idempotente schrijfbewerkingen standaard worden ingeschakeld. Als een Kafka-sink versie 2.8.0 of lager gebruikt met ACL's die zijn geconfigureerd, maar zonder IDEMPOTENT_WRITE
ingeschakeld, mislukt de schrijfbewerking met het foutbericht org.apache.kafka.common.KafkaException:
Cannot execute transactional method because we are in an error state
.
Los deze fout op door een upgrade uit te voeren naar Kafka versie 2.8.0 of hoger, of door de instelling .option(“kafka.enable.idempotence”, “false”)
tijdens het configureren van uw Structured Streaming Writer.
Het schema dat aan de DataStreamWriter is verstrekt, communiceert met de Kafka-sink. U kunt de volgende velden gebruiken:
Kolomnaam | Vereist of optioneel | Type |
---|---|---|
key |
optioneel |
STRING of BINARY |
value |
vereist |
STRING of BINARY |
headers |
optioneel | ARRAY |
topic |
optioneel (genegeerd wanneer topic is ingesteld als schrijver optie) |
STRING |
partition |
optioneel | INT |
Hier volgen algemene opties die zijn ingesteld tijdens het schrijven naar Kafka:
Optie | Waarde | Standaardwaarde | Beschrijving |
---|---|---|---|
kafka.boostrap.servers |
Een door komma's gescheiden lijst met <host:port> |
Geen | [Vereist] De Kafka-configuratie bootstrap.servers . |
topic |
STRING |
niet ingesteld | [Optioneel] Hiermee stelt u het onderwerp in voor alle rijen die moeten worden geschreven. Met deze optie worden alle onderwerpkolommen overschreven die in de gegevens aanwezig zijn. |
includeHeaders |
BOOLEAN |
false |
[Optioneel] Of u de Kafka-headers in de rij wilt opnemen. |
Zie de integratiehandleiding voor Structured Streaming Kafka voor andere optionele configuraties.
Kafka-metrieken ophalen
U kunt het gemiddelde, het minimum en het maximum van het aantal offsets ophalen waarmee de streamingquery achterblijft op de laatst beschikbare offset onder alle geabonneerde onderwerpen, met de avgOffsetsBehindLatest
, maxOffsetsBehindLatest
, en minOffsetsBehindLatest
metriek. Zie Interactief metrische gegevens lezen.
Notitie
Beschikbaar in Databricks Runtime 9.1 en hoger.
Haal het geschatte totale aantal bytes op dat het queryproces niet heeft verbruikt uit de geabonneerde onderwerpen door de waarde van estimatedTotalBytesBehindLatest
te onderzoeken. Deze schatting is gebaseerd op de batches die in de afgelopen 300 seconden zijn verwerkt. Het tijdsbestek waarop de schatting is gebaseerd, kan worden gewijzigd door de optie bytesEstimateWindowLength
in te stellen op een andere waarde. Als u deze bijvoorbeeld wilt instellen op 10 minuten:
df = (spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)
Als u de stream uitvoert in een notebook, ziet u deze metrische gegevens op het tabblad Onbewerkte gegevens in het voortgangsdashboard voor streamingquery's:
{
"sources": [
{
"description": "KafkaV2[Subscribe[topic]]",
"metrics": {
"avgOffsetsBehindLatest": "4.0",
"maxOffsetsBehindLatest": "4",
"minOffsetsBehindLatest": "4",
"estimatedTotalBytesBehindLatest": "80.0"
}
}
]
}
SSL gebruiken om Azure Databricks te verbinden met Kafka
Volg de instructies in de Confluent-documentatie Encryption and Authentication met SSLom SSL-verbindingen met Kafka in te schakelen. U kunt de configuraties opgeven die daar worden beschreven, voorafgegaan door kafka.
, als opties. U geeft bijvoorbeeld de locatie van de truststore op in de eigenschap kafka.ssl.truststore.location
.
Databricks raadt u het volgende aan:
- Sla uw certificaten op in de opslag van cloudobjecten. U kunt de toegang tot de certificaten alleen beperken tot clusters die toegang hebben tot Kafka. Zie Gegevensbeheer met Unity Catalog.
- Sla uw certificaatwachtwoorden op als geheimen in een geheim bereik.
In het volgende voorbeeld worden objectopslaglocaties en Databricks-geheimen gebruikt om een SSL-verbinding in te schakelen:
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", ...)
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)
Kafka in HDInsight verbinden met Azure Databricks
Maak een HDInsight Kafka-cluster.
Zie Verbinding maken met Kafka in HDInsight via een virtueel Azure-netwerk voor instructies.
Configureer de Kafka-brokers om het juiste adres te adverteren.
Volg de instructies in Kafka configureren voor IP-reclame. Als u Kafka zelf beheert op virtuele Azure-machines, moet u ervoor zorgen dat de
advertised.listeners
configuratie van de brokers is ingesteld op het interne IP-adres van de hosts.Maak een Azure Databricks-cluster.
Koppel het Kafka-cluster aan het Azure Databricks-cluster.
Volg de instructies in virtuele peernetwerken.
Verificatie van service-principal met Microsoft Entra-id en Azure Event Hubs
Azure Databricks ondersteunt de verificatie van Spark-taken met Event Hubs-services. Deze verificatie wordt uitgevoerd via OAuth met Microsoft Entra-id.
Azure Databricks biedt ondersteuning voor Microsoft Entra ID-verificatie met een client-id en geheim in de volgende rekenomgevingen:
- Databricks Runtime 12.2 LTS en hoger op rekenkracht die is geconfigureerd met de toegewezen toegangsmodus (voorheen de modus voor toegang van één gebruiker).
- Databricks Runtime 14.3 LTS en hoger op rekenkracht die is geconfigureerd met de standaard toegangsmodus (voorheen gedeelde toegangsmodus).
- DLT-pijplijnen die zijn geconfigureerd zonder Unity Catalog.
Azure Databricks biedt geen ondersteuning voor Microsoft Entra ID-verificatie met een certificaat in een rekenomgeving of in DLT-pijplijnen die zijn geconfigureerd met Unity Catalog.
Deze verificatie werkt niet op computersystemen met de standaard toegangsmethode of op Unity Catalog DLT.
Structured Streaming Kafka-connector configureren
Voor het uitvoeren van verificatie met Microsoft Entra-id hebt u de volgende waarden nodig:
Een tenant-id. U vindt dit op het tabblad Services van Microsoft Entra ID .
Een clientID (ook wel toepassings-id genoemd).
Een clientgeheim. Zodra u dit hebt, moet u het toevoegen als een geheim aan uw Databricks-werkruimte. Zie Geheimbeheer om dit geheim toe te voegen.
Een EventHubs-onderwerp. U vindt een lijst met onderwerpen in de sectie Event Hubs onder de sectie Entiteiten op een specifieke Event Hubs-naamruimte pagina. Als u met meerdere onderwerpen wilt werken, kunt u de IAM-rol instellen op Event Hubs-niveau.
Een EventHubs-server. U vindt dit op de overzichtspagina van uw specifieke Event Hubs-naamruimte:
Daarnaast moeten we Kafka vertellen dat we Entra ID moeten gebruiken en het OAuth SASL-mechanisme moeten toepassen (SASL is een algemeen protocol en OAuth is een type SASL-mechanisme):
-
kafka.security.protocol
moet zijnSASL_SSL
-
kafka.sasl.mechanism
moet zijnOAUTHBEARER
-
kafka.sasl.login.callback.handler.class
moet een volledig gekwalificeerde naam van de Java-klasse zijn met een waarde vankafkashaded
voor de callback-handler voor aanmelding van onze gearceerde Kafka-klasse. Zie het volgende voorbeeld voor de exacte klasse.
Voorbeeld
Laten we nu eens kijken naar een doorlopend voorbeeld.
Python
# This is the only section you need to modify for auth purposes!
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")
event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------
sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'
kafka_options = {
# Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers": f"{event_hubs_server}:9093",
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,
# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}
df = spark.readStream.format("kafka").options(**kafka_options)
display(df)
Scala
// This is the only section you need to modify for auth purposes!
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")
val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------
val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""
val kafkaOptions = Map(
// Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers" -> s"$eventHubsServer:9093",
"kafka.sasl.jaas.config" -> saslConfig,
"kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
"subscribe" -> eventHubsTopic,
// You should not need to modify these
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)
val scalaDF = spark.readStream
.format("kafka")
.options(kafkaOptions)
.load()
display(scalaDF)
Mogelijke fouten afhandelen
Streamingopties worden niet ondersteund.
Als u dit verificatiemechanisme probeert te gebruiken in een DLT-pijplijn die is geconfigureerd met Unity Catalog, krijgt u mogelijk de volgende fout:
Gebruik een ondersteunde rekenconfiguratie om deze fout op te lossen. Zie Service Principal-verificatie met Microsoft Entra ID en Azure Event Hubs.
Kan geen nieuwe
KafkaAdminClient
maken.Dit is een interne fout die Kafka genereert als een van de volgende verificatieopties onjuist is:
- Client-id (ook wel toepassings-id genoemd)
- Tenant ID
- EventHubs-server
Als u de fout wilt oplossen, controleert u of de waarden juist zijn voor deze opties.
Bovendien ziet u deze fout mogelijk als u de standaardconfiguratieopties wijzigt die in het voorbeeld zijn opgegeven (dat u bent gevraagd om niet te wijzigen), zoals
kafka.security.protocol
.Er worden geen records geretourneerd
Als u uw DataFrame probeert weer te geven of te verwerken, maar geen resultaten krijgt, ziet u het volgende in de gebruikersinterface.
Dit bericht betekent dat de verificatie is geslaagd, maar EventHubs geen gegevens heeft geretourneerd. Een aantal mogelijke (hoewel niet volledig) redenen zijn:
- U hebt het verkeerde EventHubs-onderwerp opgegeven.
- De standaardoptie voor
startingOffsets
Kafka-configuratie islatest
, en u ontvangt momenteel nog geen gegevens via het topic. U kuntstartingOffsetstoearliest
instellen om gegevens te lezen vanuit de vroegste Kafka-offsets.