Processamento de fluxo com Apache Kafka e Azure Databricks
Este artigo descreve como você pode usar o Apache Kafka como uma fonte ou um coletor ao executar cargas de trabalho de Streaming Estruturado no Azure Databricks.
Para mais informações sobre Kafka, consulte a documentação de Kafka.
Ler dados de Kafka
A seguir está um exemplo para uma leitura de streaming de Kafka:
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)
O Azure Databricks também dá suporte à semântica de leitura em lote para fontes de dados Kafka, conforme mostrado no exemplo a seguir:
df = (spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
)
Para carregamento incremental em lote, o Databricks recomenda o uso do Kafka com Trigger.AvailableNow
o . Consulte Configurando o processamento incremental em lote.
No Databricks Runtime 13.3 LTS e superior, o Azure Databricks fornece uma função SQL para ler dados Kafka. O streaming com SQL é suportado apenas em Delta Live Tables ou com tabelas de streaming em Databricks SQL. Consulte read_kafka função com valor de tabela.
Configurar o leitor Kafka Structured Streaming
O Azure Databricks fornece a palavra-chave kafka
como um formato de dados para configurar conexões com o Kafka 0.10+.
A seguir estão as configurações mais comuns para Kafka:
Há várias maneiras de especificar quais tópicos assinar. Você deve fornecer apenas um destes parâmetros:
Opção | valor | Description |
---|---|---|
subscrever | Uma lista de tópicos separados por vírgula. | A lista de tópicos para se inscrever. |
subscribePattern | Cadeia de caracteres regex Java. | O padrão usado para se inscrever no(s) tópico(s). |
atribuir | Cadeia de caracteres {"topicA":[0,1],"topic":[2,4]} JSON . |
Tópico específicoPartições a consumir. |
Outras configurações notáveis:
Opção | valor | Valor Predefinido | Description |
---|---|---|---|
kafka.bootstrap.servidores | Lista separada por vírgulas de host:port. | empty | [Obrigatório] A configuração de Kafka bootstrap.servers . Se você achar que não há dados de Kafka, verifique a lista de endereços do corretor primeiro. Se a lista de endereços do corretor estiver incorreta, pode não haver erros. Isso ocorre porque o cliente Kafka assume que os corretores ficarão disponíveis eventualmente e, no caso de erros de rede, tente novamente para sempre. |
failOnDataLoss |
true ou false . |
true |
[Opcional] Se a consulta deve falhar quando é possível que os dados tenham sido perdidos. As consultas podem falhar permanentemente ao ler dados de Kafka devido a muitos cenários, como tópicos excluídos, truncamento de tópico antes do processamento e assim por diante. Tentamos estimar de forma conservadora se os dados foram possivelmente perdidos ou não. Às vezes, isso pode causar falsos alarmes. Defina essa opção como false se ela não funcionar conforme o esperado ou se você quiser que a consulta continue processando apesar da perda de dados. |
minPartições | Inteiro >= 0, 0 = desativado. | 0 (desativado) | [Opcional] Número mínimo de partições para ler a partir de Kafka. Você pode configurar o Spark para usar um mínimo arbitrário de partições para ler do Kafka usando a minPartitions opção. Normalmente, o Spark tem um mapeamento 1-1 de Kafka topicPartitions para partições Spark consumindo de Kafka. Se você definir a opção minPartitions para um valor maior do que o seu tópico KafkaPartitions, o Spark dividirá partições Kafka grandes em partes menores. Essa opção pode ser definida em momentos de pico de cargas, distorção de dados e à medida que seu fluxo está ficando para trás para aumentar a taxa de processamento. Ele tem um custo de inicialização de consumidores Kafka em cada gatilho, o que pode afetar o desempenho se você usar SSL ao se conectar ao Kafka. |
kafka.group.id | Um ID de grupo de consumidores Kafka. | não definido | [Opcional] ID de grupo para usar durante a leitura de Kafka. Utilize isto com precaução. Por padrão, cada consulta gera um ID de grupo exclusivo para leitura de dados. Isso garante que cada consulta tenha seu próprio grupo de consumidores que não enfrente interferência de nenhum outro consumidor e, portanto, possa ler todas as partições de seus tópicos inscritos. Em alguns cenários (por exemplo, autorização baseada em grupo Kafka), convém usar IDs de grupo autorizadas específicas para ler dados. Opcionalmente, você pode definir o ID do grupo. No entanto, faça isso com extrema cautela, pois pode causar um comportamento inesperado. - A execução simultânea de consultas (em lote, em lote e streaming) com o mesmo ID de grupo provavelmente interfere entre si, fazendo com que cada consulta leia apenas parte dos dados. - Isso também pode ocorrer quando as consultas são iniciadas/reiniciadas em rápida sucessão. Para minimizar esses problemas, defina a configuração do consumidor Kafka session.timeout.ms para ser muito pequena. |
iniciandoOffsets | mais cedo , mais recente | mais recente | [Opcional] O ponto inicial quando uma consulta é iniciada, seja "mais antiga", que é dos primeiros deslocamentos, ou uma string JSON especificando um deslocamento inicial para cada TopicPartition. No JSON, -2 pode ser usado como um deslocamento para se referir ao mais antigo, e -1 ao mais recente. Nota: Para consultas em lote, o mais recente (implicitamente ou usando -1 em json) não é permitido. Para consultas de streaming, isto só se aplica quando uma nova consulta é iniciada, e a retomada sempre continuará de onde a consulta parou. As partições recém-descobertas durante uma consulta começarão no mínimo. |
Consulte o Guia de Integração do Kafka de Streaming Estruturado para obter outras configurações opcionais.
Esquema para registros Kafka
O esquema dos registros de Kafka é:
Coluna | Tipo |
---|---|
key | binário |
valor | binário |
topic | string |
partição | número inteiro |
Compensação | long |
carimbo de data/hora | long |
timestampType | número inteiro |
O key
e o value
são sempre desserializados como matrizes de bytes com o ByteArrayDeserializer
. Use operações DataFrame (como cast("string")
) para desserializar explicitamente as chaves e valores.
Gravar dados em Kafka
Segue-se um exemplo de uma gravação de streaming para Kafka:
(df
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
)
O Azure Databricks também dá suporte à semântica de gravação em lote para coletores de dados Kafka, conforme mostrado no exemplo a seguir:
(df
.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
)
Configurar o gravador de Kafka Structured Streaming
Importante
O Databricks Runtime 13.3 LTS e superior inclui uma versão mais recente da kafka-clients
biblioteca que permite gravações idempotentes por padrão. Se um coletor Kafka usar a versão 2.8.0 ou inferior com ACLs configuradas, mas sem IDEMPOTENT_WRITE
habilitadas, a gravação falhará com a mensagem org.apache.kafka.common.KafkaException:
Cannot execute transactional method because we are in an error state
de erro .
Resolva esse erro atualizando para Kafka versão 2.8.0 ou superior, ou definindo .option(“kafka.enable.idempotence”, “false”)
ao configurar seu gravador de Streaming Estruturado.
O esquema fornecido ao DataStreamWriter interage com o coletor Kafka. Você pode usar os seguintes campos:
Nome da coluna | Obrigatório ou opcional | Tipo |
---|---|---|
key |
opcional |
STRING ou BINARY |
value |
obrigatório |
STRING ou BINARY |
headers |
opcional | ARRAY |
topic |
opcional (ignorado se topic estiver definido como opção de gravador) |
STRING |
partition |
opcional | INT |
A seguir estão as opções comuns definidas ao escrever para Kafka:
Opção | valor | Valor predefinido | Description |
---|---|---|---|
kafka.boostrap.servers |
Uma lista separada por vírgulas de <host:port> |
nenhum | [Obrigatório] A configuração de Kafka bootstrap.servers . |
topic |
STRING |
não definido | [Opcional] Define o tópico para todas as linhas a serem escritas. Esta opção substitui qualquer coluna de tópico que exista nos dados. |
includeHeaders |
BOOLEAN |
false |
[Opcional] Se os cabeçalhos de Kafka devem ser incluídos na linha. |
Consulte o Guia de Integração do Kafka de Streaming Estruturado para obter outras configurações opcionais.
Recuperar métricas de Kafka
Você pode obter a média, o mínimo e o máximo do número de offsets que a consulta de streaming está atrás do último offset disponível entre todos os tópicos subscritos, obtidas com as métricas avgOffsetsBehindLatest
, maxOffsetsBehindLatest
e minOffsetsBehindLatest
. Consulte Leitura de métricas interativamente.
Nota
Disponível no Databricks Runtime 9.1 e superior.
Obtenha o número total estimado de bytes que o processo de consulta não consumiu dos tópicos inscritos examinando o valor de estimatedTotalBytesBehindLatest
. Esta estimativa baseia-se nos lotes que foram processados nos últimos 300 segundos. O período de tempo em que a estimativa se baseia pode ser alterado definindo a opção bytesEstimateWindowLength
para um valor diferente. Por exemplo, para defini-lo como 10 minutos:
df = (spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)
Se você estiver executando o fluxo em um bloco de anotações, poderá ver essas métricas na guia Dados brutos no painel de progresso da consulta de streaming:
{
"sources" : [ {
"description" : "KafkaV2[Subscribe[topic]]",
"metrics" : {
"avgOffsetsBehindLatest" : "4.0",
"maxOffsetsBehindLatest" : "4",
"minOffsetsBehindLatest" : "4",
"estimatedTotalBytesBehindLatest" : "80.0"
},
} ]
}
Usar SSL para conectar o Azure Databricks ao Kafka
Para habilitar conexões SSL com Kafka, siga as instruções na documentação do Confluent Criptografia e autenticação com SSL. Você pode fornecer as configurações descritas lá, prefixadas com kafka.
, como opções. Por exemplo, você especifica o local de armazenamento confiável na propriedade kafka.ssl.truststore.location
.
A Databricks recomenda que você:
- Armazene seus certificados no armazenamento de objetos na nuvem. Você pode restringir o acesso aos certificados apenas aos clusters que podem acessar o Kafka. Consulte Governança de dados com o Catálogo Unity.
- Armazene suas senhas de certificado como segredos em um escopo secreto.
O exemplo a seguir usa locais de armazenamento de objetos e segredos do Databricks para habilitar uma conexão SSL:
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", ...)
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)
Conectar o Kafka no HDInsight ao Azure Databricks
Crie um cluster HDInsight Kafka.
Consulte Conectar-se ao Kafka no HDInsight por meio de uma Rede Virtual do Azure para obter instruções.
Configure os corretores Kafka para anunciar o endereço correto.
Siga as instruções em Configurar Kafka para publicidade IP. Se você gerencia o Kafka por conta própria nas Máquinas Virtuais do Azure, certifique-se de que a configuração
advertised.listeners
dos brokers esteja definida como o IP interno dos hosts.Crie um cluster do Azure Databricks.
Emparelhe o cluster Kafka ao cluster do Azure Databricks.
Siga as instruções em Redes virtuais de mesmo nível.
Autenticação da entidade de serviço com o Microsoft Entra ID e os Hubs de Eventos do Azure
O Azure Databricks dá suporte à autenticação de trabalhos do Spark com serviços de Hubs de Eventos. Essa autenticação é feita via OAuth com o Microsoft Entra ID.
O Azure Databricks dá suporte à autenticação de ID do Microsoft Entra com uma ID de cliente e segredo nos seguintes ambientes de computação:
- Databricks Runtime 12.2 LTS e superior em computação configurada com modo de acesso de usuário único.
- Databricks Runtime 14.3 LTS e superior em computação configurada com modo de acesso compartilhado.
- Pipelines Delta Live Tables configurados sem Unity Catalog.
O Azure Databricks não oferece suporte à autenticação de identidade do Microsoft Entra com um certificado em qualquer ambiente de computação ou em canalizações do Delta Live Tables configuradas com o Unity Catalog.
Essa autenticação não funciona em clusters compartilhados ou em Unity Catalog Delta Live Tables.
Configurando o conector Kafka de streaming estruturado
Para executar a autenticação com o Microsoft Entra ID, você precisará dos seguintes valores:
Um ID de locatário. Você pode encontrar isso na guia Serviços do Microsoft Entra ID .
Um clientID (também conhecido como ID do aplicativo).
Um segredo do cliente. Depois de ter isso, você deve adicioná-lo como um segredo ao seu espaço de trabalho Databricks. Para adicionar esse segredo, consulte Gerenciamento secreto.
Um tópico do EventHubs. Você pode encontrar uma lista de tópicos na secção Hubs de Eventos sob a secção Entidades numa página específica de Namespace de Hubs de Eventos . Para trabalhar com vários tópicos, você pode definir a função do IAM no nível dos Hubs de Eventos.
Um servidor EventHubs. Você pode encontrar isso na página de visão geral do seu namespace específico de Hubs de Eventos:
Além disso, para usar o Entra ID, precisamos dizer a Kafka para usar o mecanismo SASL OAuth (SASL é um protocolo genérico, e OAuth é um tipo de "mecanismo" SASL):
-
kafka.security.protocol
deve serSASL_SSL
-
kafka.sasl.mechanism
deve serOAUTHBEARER
-
kafka.sasl.login.callback.handler.class
deve ser um nome totalmente qualificado da classe Java com um valor de para o manipulador de retorno de chamada dekafkashaded
login de nossa classe Kafka sombreada. Veja o exemplo a seguir para a classe exata.
Exemplo
Em seguida, vejamos um exemplo em execução:
Python
# This is the only section you need to modify for auth purposes!
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")
event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------
sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'
kafka_options = {
# Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers": f"{event_hubs_server}:9093",
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,
# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}
df = spark.readStream.format("kafka").options(**kafka_options)
display(df)
Scala
// This is the only section you need to modify for auth purposes!
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")
val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------
val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""
val kafkaOptions = Map(
// Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers" -> s"$eventHubsServer:9093",
"kafka.sasl.jaas.config" -> saslConfig,
"kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
"subscribe" -> eventHubsTopic,
// You should not need to modify these
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)
val scalaDF = spark.readStream
.format("kafka")
.options(kafkaOptions)
.load()
display(scalaDF)
Tratamento de possíveis erros
As opções de streaming não são suportadas.
Se você tentar usar esse mecanismo de autenticação em um pipeline Delta Live Tables configurado com o Unity Catalog, poderá receber o seguinte erro:
Para resolver esse erro, use uma configuração de computação suportada. Consulte Autenticação da entidade de serviço com a ID do Microsoft Entra e Hubs de Eventos do Azure.
Falha ao criar um novo
KafkaAdminClient
arquivo .Este é um erro interno que Kafka lança se qualquer uma das seguintes opções de autenticação estiver incorreta:
- ID do cliente (também conhecido como ID do aplicativo)
- ID de Inquilino do
- Servidor EventHubs
Para resolver o erro, verifique se os valores estão corretos para essas opções.
Além disso, poderá ver este erro se modificar as opções de configuração fornecidas por predefinição no exemplo (que lhe foi pedido para não modificar), como
kafka.security.protocol
.Não há registros sendo devolvidos
Se você estiver tentando exibir ou processar seu DataFrame, mas não estiver obtendo resultados, verá o seguinte na interface do usuário.
Essa mensagem significa que a autenticação foi bem-sucedida, mas o EventHubs não retornou nenhum dado. Algumas razões possíveis (embora não exaustivas) são:
- Você especificou o tópico EventHubs errado.
- A opção de configuração padrão do Kafka para
startingOffsets
élatest
, e você ainda não está recebendo nenhum dado através do tópico. Você pode definirstartingOffsetstoearliest
para começar a ler dados a partir dos primeiros deslocamentos de Kafka.