Traitement de flux avec Apache Kafka et Azure Databricks
Cet article explique comment utiliser Apache Kafka en tant que source ou récepteur lors de l’exécution de charges de travail Structured Streaming sur Azure Databricks.
Pour plus d’informations sur Kafka, consultez la documentation Kafka.
Lire les données de Kafka
Voici un exemple pour une lecture en continu à partir de Kafka :
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)
Azure Databricks prend également en charge la sémantique de lecture par lots pour les sources de données Kafka, comme illustré dans l’exemple suivant :
df = (spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
)
Pour le chargement par lots incrémentiel, Databricks recommande d’utiliser Kafka avec Trigger.AvailableNow
. Consultez Configuration du traitement par lots incrémentiel.
Dans Databricks Runtime 13.3 LTS et versions ultérieures, Azure Databricks fournit une fonction SQL pour lire les données Kafka. Le streaming avec SQL est pris en charge uniquement dans Delta Live Tables ou avec les tables de streaming dans Databricks SQL. Voir fonction table read_kafka.
Configurer le lecteur Kafka de flux structuré
Azure Databricks fournit le mot clé kafka
en tant que format de données pour configurer les connexions à Kafka 0.10+.
Voici les configurations les plus courantes pour Kafka :
Il existe plusieurs façons de spécifier des rubriques auxquelles s’abonner. Vous ne devez fournir qu’un seul de ces paramètres :
Option | Valeur | Description |
---|---|---|
subscribe | Liste séparée par des virgules des rubriques. | Liste de rubriques auxquelles s’abonner. |
subscribePattern | Chaîne regex Java. | Modèle utilisé pour s’abonner à une ou plusieurs rubriques. |
attribuer | Chaîne JSON {"topicA":[0,1],"topic":[2,4]} . |
TopicPartitions spécifique à consommer. |
Autres configurations notables :
Option | Valeur | Valeur par défaut | Description |
---|---|---|---|
kafka.bootstrap.servers | Liste séparée par des virgules de host:port. | empty | [Obligatoire] Configuration de bootstrap.servers Kafka. Si vous constatez qu’il n’y a aucune donnée de Kafka, vérifiez d’abord la liste d’adresses du répartiteur. Si la liste d’adresses du répartiteur est incorrecte, il se peut qu’il n’y ait aucune erreur. Cela est dû au fait que le client Kafka part du principe que les répartiteurs deviendront finalement disponibles et qu’en cas d’erreurs réseau, ils effectueront de nouvelles tentatives indéfiniment. |
failOnDataLoss | true ou false . |
true |
[Facultatif] Indique s’il faut faire échouer la requête quand une perte de données est possible. Les requêtes peuvent échouer de façon permanente à lire des données à partir de Kafka dans de nombreux cas, tels que la suppression de rubriques, une troncation de rubrique avant traitement, etc. Nous essayons d’estimer prudemment si une perte de données est ou non possible. Cela peut parfois déclencher de fausses alarmes. Définissez cette option sur false si elle ne fonctionne pas comme prévu, ou si vous souhaitez que la requête poursuive le traitement en dépit d’une perte de données. |
minPartitions | Integer >= 0, 0 = disabled. | 0 (désactivé) | [Facultatif] Nombre minimal de partitions à lire à partir de Kafka. Vous pouvez configurer Spark pour utiliser un minimum arbitraire de partitions à lire à partir de Kafka à l’aide de l’option minPartitions . Normalement, Spark a un mappage de 1-1 des topicPartitions Kafka aux partitions Spark consommatrices de Kafka. Si vous définissez l’option minPartitions sur une valeur supérieure à celle de vos topicPartitions Kafka, Spark divise les partitions Kafka volumineuses en éléments plus petits. Cette option peut être définie aux heures des pics de charge, en cas d’asymétrie des données et lorsque votre flux prend du retard afin d’augmenter la vitesse de traitement. Cela a un coût lié à l’initialisation des consommateurs Kafka à chaque déclenchement, ce qui peut avoir un impact sur les performances si vous utilisez SSL lors de la connexion à Kafka. |
kafka.group.id | ID de groupe de consommateurs Kafka | non défini | [Facultatif] ID de groupe à utiliser lors de la lecture à partir de Kafka. Utilisez cet option avec prudence. Par défaut, chaque requête génère un ID de groupe unique pour la lecture des données. Cela garantit que chaque requête a son propre groupe de consommateurs qui ne rencontre pas d’interférence de la part d’un autre consommateur, et peut donc lire toutes les partitions des rubriques auxquelles il est abonné. Dans certains cas (par exemple, autorisation basée sur un groupe Kafka), vous pouvez utiliser des ID de groupe autorisé spécifique pour lire les données. Vous pouvez éventuellement définir l’ID de groupe. Toutefois, faites cela avec une extrême prudence, car cela peut provoquer un comportement inattendu. - L’exécution simultanée de requêtes (tant de traitement par lots que de diffusion en continu) avec le même ID de groupe peut entraîner une interférence entre elles ayant pour effet que chaque requête ne lit qu’une partie des données. - Cela peut également se produire quand des requêtes sont démarrées/redémarrées rapidement. Pour minimiser de tels problèmes, définissez la configuration de consommateur Kafka session.timeout.ms sur une configuration très petite. |
startingOffsets | earliest , latest | latest | [Facultatif] Point de départ lors du démarrage d’une requête, soit « earliest » qui correspond aux décalages les plus précoces, soit une chaîne json spécifiant un décalage de départ pour chaque TopicPartition. Dans le json, -2 peut être utilisé pour indiquer le décalage le plus précoce, -1 pour indiquer le décalage le plus récent. Remarque : pour les requêtes par lot, le décalage le plus récent (implicitement ou à l’aide de -1 en json) n’est pas autorisé. Pour les requêtes de diffusion en continu, cela ne s’applique que lors du lancement d’une nouvelle requête, et la reprise reprendra toujours là où la requête s’est arrêtée. Les partitions nouvellement découvertes au cours d’une requête démarreront au plus tôt. |
Pour d’autres configurations facultatives, consultez le Guide d’intégration de diffusion en continu structurée Kafka.
Schéma pour les enregistrements Kafka
Le schéma des enregistrements Kafka est le suivant :
Colonne | Type |
---|---|
key | binary |
value | binary |
topic | string |
partition | int |
offset | long |
timestamp | long |
timestampType | int |
Les key
et value
sont toujours désérialisées en tant que tableaux d’octets avec le ByteArrayDeserializer
. Utilisez des opérations DataFrame (telles que cast("string")
) pour désérialiser explicitement les clés et les valeurs.
Écrire des données dans Kafka
Voici un exemple ci-après pour une écriture en streaming dans Kafka :
(df
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
)
Azure Databricks prend également en charge la sémantique d’écriture dans des récepteurs de données Kafka, comme illustré dans l’exemple suivant :
(df
.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
)
Configurer le lecteur Kafka Structured Streaming
Important
Databricks Runtime 13.3 LTS et versions ultérieures incluent une version plus récente de la bibliothèque kafka-clients
qui active des écritures idempotentes par défaut. Si un récepteur Kafka utilise la version 2.8.0 ou antérieure avec des listes de contrôle d’accès configurées, mais sans activation de IDEMPOTENT_WRITE
, l’écriture échoue avec le message d’erreur org.apache.kafka.common.KafkaException:
Cannot execute transactional method because we are in an error state
.
Résolvez cette erreur en mettant à niveau vers Kafka version 2.8.0 ou ultérieure, ou en définissant .option(“kafka.enable.idempotence”, “false”)
pendant la configuration de votre enregistreur Structured Streaming.
Le schéma fourni à DataStreamWriter interagit avec le récepteur Kafka. Vous pouvez utiliser les champs suivants :
Nom de la colonne | Obligatoire ou facultatif | Type |
---|---|---|
key |
facultatif | STRING ou BINARY |
value |
Obligatoire | STRING ou BINARY |
headers |
facultatif | ARRAY |
topic |
facultatif (ignoré si topic est défini comme option d’enregistreur) |
STRING |
partition |
facultatif | INT |
Voici les options courantes définies lors de l’écriture dans Kafka :
Option | Valeur | Valeur par défaut | Description |
---|---|---|---|
kafka.boostrap.servers |
Une liste des machines virtuelles séparée par des virgules de <host:port> |
Aucun | [Obligatoire] Configuration de bootstrap.servers Kafka. |
topic |
STRING |
non défini | [Facultatif] Définit la rubrique pour toutes les lignes à écrire. Cette option remplace toute colonne de rubrique qui existe dans les données. |
includeHeaders |
BOOLEAN |
false |
[Facultatif] Indique s’il faut inclure les en-têtes Kafka dans la ligne. |
Pour d’autres configurations facultatives, consultez le Guide d’intégration de diffusion en continu structurée Kafka.
Récupérer des métriques Kafka
Vous pouvez obtenir la moyenne, le minimum et le maximum du nombre de décalages de requête de diffusion en continu par rapport au dernier décalage disponible parmi toutes les rubriques souscrites avec les métriques avgOffsetsBehindLatest
, maxOffsetsBehindLatest
et minOffsetsBehindLatest
. Consultez Lecture des métriques de manière interactive.
Notes
Disponible dans Databricks Runtime 9.1 et versions ultérieures.
Obtenez le nombre total estimé d’octets que le processus de requête n’a pas consommés à partir des rubriques souscrites en examinant la valeur de estimatedTotalBytesBehindLatest
. Cette estimation est basée sur les lots qui ont été traités au cours des 300 dernières secondes. La période sur laquelle l’estimation est basée peut être modifiée en définissant l’option bytesEstimateWindowLength
sur une valeur différente. Par exemple, pour la définir sur 10 minutes :
df = (spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)
Si vous exécutez le flux dans un notebook, vous pouvez voir ces métriques sous l’onglet Données brutes du tableau de bord de progression des requêtes de diffusion en continu :
{
"sources" : [ {
"description" : "KafkaV2[Subscribe[topic]]",
"metrics" : {
"avgOffsetsBehindLatest" : "4.0",
"maxOffsetsBehindLatest" : "4",
"minOffsetsBehindLatest" : "4",
"estimatedTotalBytesBehindLatest" : "80.0"
},
} ]
}
Utiliser SSL pour connecter Azure Databricks à Kafka
Pour activer les connexions SSL à Kafka, suivez les instructions fournies dans la documentation de Confluent Chiffrement et authentification avec SSL. Vous pouvez fournir les configurations décrites ici, préfixées par kafka.
, comme options. Par exemple, vous spécifiez l’emplacement du magasin de confiance dans la propriété kafka.ssl.truststore.location
.
Databricks vous recommande de :
- Stocker vos certificats dans le stockage d’objets cloud. Vous pouvez restreindre l’accès aux certificats uniquement aux clusters qui peuvent accéder à Kafka. Consultez Gouvernance des données avec Unity Catalog.
- Stockez vos mots de passe de certificat en tant que secrets dans une étendue de secrets.
L’exemple suivant utilise des emplacements de stockage d’objets et des secrets Databricks pour activer une connexion SSL :
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", ...)
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)
Connecter Kafka sur HDInsight à Azure Databricks
Créez un cluster Kafka sur HDInsight.
Pour obtenir des instructions, consultez Se connecter à Kafka sur HDInsight via un réseau virtuel Azure.
Configurez les répartiteurs Kafka pour qu’ils publient l’adresse correcte.
Suivez les instructions fournies dans Configurer Kafka pour la publication d’adresses IP. Si vous gérez Kafka vous-même sur des machines virtuelles Azure, assurez-vous que la configuration
advertised.listeners
des courtiers est définie sur l’adresse IP interne des hôtes.Créez un cluster Azure Databricks
Appairez le cluster Kafka au cluster Azure Databricks.
Suivez les instructions fournies dans Appairer des réseaux virtuels.
Authentification du principal de service avec Microsoft Entra ID et Azure Event Hubs
Azure Databricks prend en charge l’authentification des travaux Spark avec les services Event Hubs. Cette authentification se fait par le biais d’OAuth avec Microsoft Entra ID.
Azure Databricks prend en charge l’authentification Microsoft Entra ID avec un ID client et un secret dans les environnements Compute suivants :
- Databricks Runtime 12.2 LTS et versions ultérieures sur le calcul configuré avec un mode d’accès utilisateur unique.
- Databricks Runtime 14.3 LTS et versions ultérieures sur le calcul configuré avec le mode d’accès partagé.
- Pipelines Delta Live Tables configurés sans Unity Catalog.
Azure Databricks ne prend pas en charge l’authentification Microsoft Entra ID avec un certificat dans n’importe quel environnement Compute ou dans les pipelines Delta Live Tables configurés avec Unity Catalog.
Cette authentification ne fonctionne pas sur les clusters partagés ou sur Unity Catalog Delta Live Tables.
Configuration du connecteur Kafka Structured Streaming
Pour effectuer l’authentification avec Microsoft Entra ID, vous aurez besoin des valeurs suivantes :
Un ID de locataire. Vous pouvez le trouver dans l’onglet Services Microsoft Entra ID.
Un ID client (également appelé ID d’application).
Une clé secrète client. Une fois que vous l’avez, vous devez l’ajouter en tant que secret à votre espace de travail Databricks. Pour ajouter ce secret, consultez Gestion des secrets.
Une rubrique EventHubs. Vous trouverez une liste de rubriques dans la section Event Hubs sous la section Entités d’une page Espace de noms Event Hubs spécifique. Pour utiliser plusieurs rubriques, vous pouvez définir le rôle IAM au niveau d’Event Hubs.
Un serveur EventHubs. Vous le trouverez dans la page Vue d’ensemble de votre espace de noms Event Hubs spécifique :
En outre, pour utiliser Entra ID, nous devons indiquer à Kafka d’utiliser le mécanisme SASL OAuth (SASL est un protocole générique, et OAuth est un type de « mécanisme » SASL) :
kafka.security.protocol
doit avoir la valeurSASL_SSL
kafka.sasl.mechanism
doit avoir la valeurOAUTHBEARER
kafka.sasl.login.callback.handler.class
doit être un nom complet de la classe Java avec la valeurkafkashaded
sur le gestionnaire de rappel de connexion de notre classe Kafka ombrée. Consultez l’exemple suivant pour connaître la classe exacte.
Exemple
Examinons ensuite un exemple en cours d’exécution :
Python
# This is the only section you need to modify for auth purposes!
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")
event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------
sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'
kafka_options = {
# Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers": f"{event_hubs_server}:9093",
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,
# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}
df = spark.readStream.format("kafka").options(**kafka_options)
display(df)
Scala
// This is the only section you need to modify for auth purposes!
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")
val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------
val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""
val kafkaOptions = Map(
// Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers" -> s"$eventHubsServer:9093",
"kafka.sasl.jaas.config" -> saslConfig,
"kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
"subscribe" -> eventHubsTopic,
// You should not need to modify these
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)
val scalaDF = spark.readStream
.format("kafka")
.options(kafkaOptions)
.load()
display(scalaDF)
Gestion des erreurs potentielles
Les options de diffusion en continu ne sont pas prises en charge.
Si vous essayez d’utiliser ce mécanisme d’authentification dans un pipeline Delta Live Tables configuré avec Unity Catalog, vous pouvez recevoir l’erreur suivante :
Pour résoudre cette erreur, utilisez une configuration de calcul prise en charge. Consultez Authentification du principal de service avec Microsoft Entra ID et Azure Event Hubs.
Échec de la création d’un
KafkaAdminClient
.Il s’agit d’une erreur interne levée par Kafka si l’une des options d’authentification suivantes est incorrecte :
- ID client (également appelé ID d’application)
- ID client
- Serveur EventHubs
Pour résoudre l’erreur, vérifiez que les valeurs sont correctes pour ces options.
En outre, vous pouvez voir cette erreur si vous modifiez les options de configuration fournies par défaut dans l’exemple (que vous n’avez pas été invité à modifier), par exemple
kafka.security.protocol
.Aucun enregistrement n’est retourné
Si vous essayez d’afficher ou de traiter votre DataFrame, mais que vous n’obtenez pas de résultats, vous obtiendrez l’affichage suivant dans l’interface utilisateur.
Ce message signifie que l’authentification a réussi, mais qu’EventHubs n’a retourné aucune donnée. Certaines raisons possibles (non exhaustives) sont les suivantes :
- Vous avez spécifié une rubrique EventHubs incorrecte.
- L’option de configuration Kafka par défaut pour
startingOffsets
estlatest
, et vous ne recevez actuellement aucune donnée via la rubrique. Vous pouvez définirstartingOffsetstoearliest
pour commencer à lire des données à partir des décalages les plus anciens de Kafka.