Compartir vía


Desencadenador de Apache Kafka para Azure Functions

Puede usar el desencadenador de Apache Kafka en Azure Functions para ejecutar el código de función en respuesta a mensajes en temas de Kafka. También puede usar un enlace de salida de Kafka para escribir desde la función en un tema. Para obtener información sobre los detalles de instalación y configuración, consulte Información general de los enlaces de Apache Kafka para Azure Functions.

Importante

Los enlaces de Kafka solo están disponibles para las instancias de Functions del plan elástico prémium y del plan dedicado (App Service). Solo se admiten en la versión 3.x y posteriores del entorno de ejecución de Functions.

Ejemplo

El uso del desencadenador depende de la modalidad de C# que se usa en la aplicación de funciones, que puede ser uno de los modos siguientes:

Una función de C# compilada de la biblioteca de clases de procesos de trabajo aislados se ejecuta en un proceso aislado del entorno de ejecución.

Los atributos que use dependen del proveedor de eventos específico.

En el ejemplo siguiente se muestra una función de C# que lee y registra el mensaje de Kafka como un evento de Kafka:

[Function("KafkaTrigger")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
    var logger = context.GetLogger("KafkaFunction");
    logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(eventData)["Value"]}");
}

Para recibir eventos en un lote, use una matriz de cadenas como entrada, como se muestra en el ejemplo siguiente:

[Function("KafkaTriggerMany")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default",
                  IsBatched = true)] string[] events, FunctionContext context)
{
    foreach (var kevent in events)
    {
        var logger = context.GetLogger("KafkaFunction");
        logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(kevent)["Value"]}");
    }

La función siguiente registra el mensaje y los encabezados del evento de Kafka:

[Function("KafkaTriggerWithHeaders")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
    var eventJsonObject = JObject.Parse(eventData);
    var logger = context.GetLogger("KafkaFunction");
    logger.LogInformation($"C# Kafka trigger function processed a message: {eventJsonObject["Value"]}");
    var headersJArr = eventJsonObject["Headers"] as JArray;
    logger.LogInformation("Headers for this event: ");
    foreach (JObject header in headersJArr)
    {
        logger.LogInformation($"{header["Key"]} {System.Text.Encoding.UTF8.GetString((byte[])header["Value"])}");

    }
}

Para obtener un conjunto completo de ejemplos de .NET en funcionamiento, consulte el repositorio de extensiones de Kafka.

Nota:

Para obtener un conjunto equivalente de ejemplos de TypeScript, consulte el repositorio de extensiones de Kafka.

Las propiedades específicas del archivo function.json dependen del proveedor de eventos, que en estos ejemplos es Confluent o Azure Event Hubs. En los ejemplos siguientes se muestra un desencadenador de Kafka para una función que lee y registra un mensaje de Kafka.

El siguiente archivo function.json define el desencadenador para el proveedor específico:

{
    "bindings": [
        {
            "type": "kafkaTrigger",
            "name": "event",
            "direction": "in",
            "topic": "topic",
            "brokerList": "%BrokerList%",
            "username": "%ConfluentCloudUserName%",
            "password": "%ConfluentCloudPassword%",
            "protocol": "saslSsl",
            "authenticationMode": "plain",
            "consumerGroup" : "$Default",
            "dataType": "string"
        }
    ]
}

Después, se ejecuta el código siguiente cuando se desencadena la función:

module.exports = async function (context, event) {
    // context.log.info(event)
    context.log.info(`JavaScript Kafka trigger function called for message ${event.Value}`);
};

Para recibir eventos en un lote, establezca el valor de cardinality en many en el archivo function.json, como se muestra en los ejemplos siguientes:

{
    "bindings": [
        {
            "type": "kafkaTrigger",
            "name": "event",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%"
        }
    ]
}

Después, el código siguiente analiza la matriz de eventos y registra los datos del evento:

module.exports = async function (context, events) {
    function print(event) {
        var eventJson = JSON.parse(event)
        context.log.info(`JavaScript Kafka trigger function called for message ${eventJson.Value}`);
    }
    events.map(print);
};

El código siguiente también registra los datos del encabezado:

module.exports = async function (context, event) {
  function print(kevent) {
    var keventJson = JSON.parse(kevent)
    context.log.info(`JavaScript Kafka trigger function called for message ${keventJson.Value}`);
    context.log.info(`Headers for this message:`)
    let headers =  keventJson.Headers;
    headers.forEach(element => {
        context.log.info(`Key: ${element.Key} Value:${Buffer.from(element.Value, 'base64')}`) 
    });
  }
  event.map(print);
};

Puede definir un esquema de Avro genérico para el evento que se pasa al desencadenador. El siguiente archivo function.json define el desencadenador para el proveedor específico con un esquema de Avro genérico:

{
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaAvroGenericSingle",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "authenticationMode" : "PLAIN",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

Después, se ejecuta el código siguiente cuando se desencadena la función:

module.exports = async function (context, event) {
    context.log.info(`JavaScript Kafka trigger function called for message ${JSON.stringify(event)}`);
};

Para obtener un conjunto completo de ejemplos de JavaScript en funcionamiento, consulte el repositorio de extensiones de Kafka.

Las propiedades específicas del archivo function.json dependen del proveedor de eventos, que en estos ejemplos es Confluent o Azure Event Hubs. En los ejemplos siguientes se muestra un desencadenador de Kafka para una función que lee y registra un mensaje de Kafka.

El siguiente archivo function.json define el desencadenador para el proveedor específico:

{
    "bindings": [
      {
            "type": "kafkaTrigger",
            "name": "kafkaEvent",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%",
            "sslCaLocation": "confluent_cloud_cacert.pem"
        }
    ]
}

Después, se ejecuta el código siguiente cuando se desencadena la función:

using namespace System.Net

param($kafkaEvent, $TriggerMetadata)

Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"

Para recibir eventos en un lote, establezca el valor de cardinality en many en el archivo function.json, como se muestra en los ejemplos siguientes:

{
    "bindings": [
      {
            "type": "kafkaTrigger",
            "name": "kafkaEvent",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%",
            "sslCaLocation": "confluent_cloud_cacert.pem"
        }
    ]
}

Después, el código siguiente analiza la matriz de eventos y registra los datos del evento:

using namespace System.Net

param($kafkaEvents, $TriggerMetadata)

$kafkaEvents
foreach ($kafkaEvent in $kafkaEvents) {
    $event = $kafkaEvent | ConvertFrom-Json -AsHashtable
    Write-Output "Powershell Kafka trigger function called for message $event.Value"
}

El código siguiente también registra los datos del encabezado:

using namespace System.Net

param($kafkaEvents, $TriggerMetadata)

foreach ($kafkaEvent in $kafkaEvents) {
    $kevent = $kafkaEvent | ConvertFrom-Json -AsHashtable
    Write-Output "Powershell Kafka trigger function called for message $kevent.Value"
    Write-Output "Headers for this message:"
    foreach ($header in $kevent.Headers) {
        $DecodedValue = [System.Text.Encoding]::Unicode.GetString([System.Convert]::FromBase64String($header.Value))
        $Key = $header.Key
        Write-Output "Key: $Key Value: $DecodedValue"
    }
}

Puede definir un esquema de Avro genérico para el evento que se pasa al desencadenador. El siguiente archivo function.json define el desencadenador para el proveedor específico con un esquema de Avro genérico:

{
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaEvent",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "authenticationMode" : "PLAIN",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

Después, se ejecuta el código siguiente cuando se desencadena la función:

using namespace System.Net

param($kafkaEvent, $TriggerMetadata)

Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"

Para obtener un conjunto completo de ejemplos de PowerShell en funcionamiento, consulte el repositorio de extensiones de Kafka.

Las propiedades específicas del archivo function.json dependen del proveedor de eventos, que en estos ejemplos es Confluent o Azure Event Hubs. En los ejemplos siguientes se muestra un desencadenador de Kafka para una función que lee y registra un mensaje de Kafka.

El siguiente archivo function.json define el desencadenador para el proveedor específico:

{
      "scriptFile": "main.py",
      "bindings": [
        {
          "type": "kafkaTrigger",
          "name": "kevent",
          "topic": "topic",
          "brokerList": "%BrokerList%",
          "username": "%ConfluentCloudUserName%",
          "password": "%ConfluentCloudPassword%",
          "consumerGroup" : "functions",
          "protocol": "saslSsl",
          "authenticationMode": "plain"
        }
    ]
}

Después, se ejecuta el código siguiente cuando se desencadena la función:

import logging
from azure.functions import KafkaEvent

def main(kevent : KafkaEvent):
    logging.info(kevent.get_body().decode('utf-8'))
    logging.info(kevent.metadata)

Para recibir eventos en un lote, establezca el valor de cardinality en many en el archivo function.json, como se muestra en los ejemplos siguientes:

{
      "scriptFile": "main.py",
      "bindings": [
        {
            "type" : "kafkaTrigger",
            "direction": "in",
            "name" : "kevents",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "topic" : "message_python",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "dataType": "string",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "BrokerList" : "%BrokerList%"    
        }
    ]
}

Después, el código siguiente analiza la matriz de eventos y registra los datos del evento:

import logging
import typing
from azure.functions import KafkaEvent

def main(kevents : typing.List[KafkaEvent]):
    for event in kevents:
        logging.info(event.get_body())

El código siguiente también registra los datos del encabezado:

import logging
import typing
from azure.functions import KafkaEvent
import json
import base64

def main(kevents : typing.List[KafkaEvent]):
    for event in kevents:
        event_dec = event.get_body().decode('utf-8')
        event_json = json.loads(event_dec)
        logging.info("Python Kafka trigger function called for message " + event_json["Value"])
        headers = event_json["Headers"]
        for header in headers:
            logging.info("Key: "+ header['Key'] + " Value: "+ str(base64.b64decode(header['Value']).decode('ascii')))

Puede definir un esquema de Avro genérico para el evento que se pasa al desencadenador. El siguiente archivo function.json define el desencadenador para el proveedor específico con un esquema de Avro genérico:

{
  "scriptFile": "main.py",
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaTriggerAvroGeneric",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "authenticationMode" : "PLAIN",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

Después, se ejecuta el código siguiente cuando se desencadena la función:

import logging
from azure.functions import KafkaEvent

def main(kafkaTriggerAvroGeneric : KafkaEvent):
    logging.info(kafkaTriggerAvroGeneric.get_body().decode('utf-8'))
    logging.info(kafkaTriggerAvroGeneric.metadata)

Para obtener un conjunto completo de ejemplos de Python en funcionamiento, consulte el repositorio de extensiones de Kafka.

Las anotaciones que se usan para configurar el desencadenador dependen del proveedor de eventos específico.

En el ejemplo siguiente se muestra una función de Java que lee y registra el contenido del evento de Kafka:

@FunctionName("KafkaTrigger")
public void runSingle(
        @KafkaTrigger(
            name = "KafkaTrigger",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            dataType = "string"
         ) String kafkaEventData,
        final ExecutionContext context) {
        context.getLogger().info(kafkaEventData);
}

Para recibir eventos en un lote, use una cadena de entrada como una matriz, como se muestra en el ejemplo siguiente:

@FunctionName("KafkaTriggerMany")
public void runMany(
        @KafkaTrigger(
            name = "kafkaTriggerMany",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            cardinality = Cardinality.MANY,
            dataType = "string"
         ) String[] kafkaEvents,
        final ExecutionContext context) {
        for (String kevent: kafkaEvents) {
            context.getLogger().info(kevent);
        }    
}

La función siguiente registra el mensaje y los encabezados del evento de Kafka:

@FunctionName("KafkaTriggerManyWithHeaders")
public void runSingle(
        @KafkaTrigger(
            name = "KafkaTrigger",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            dataType = "string",
            cardinality = Cardinality.MANY
         ) List<String> kafkaEvents,
        final ExecutionContext context) {
            Gson gson = new Gson(); 
            for (String keventstr: kafkaEvents) {
                KafkaEntity kevent = gson.fromJson(keventstr, KafkaEntity.class);
                context.getLogger().info("Java Kafka trigger function called for message: " + kevent.Value);
                context.getLogger().info("Headers for the message:");
                for (KafkaHeaders header : kevent.Headers) {
                    String decodedValue = new String(Base64.getDecoder().decode(header.Value));
                    context.getLogger().info("Key:" + header.Key + " Value:" + decodedValue);                    
                }                
            }
        }

Puede definir un esquema de Avro genérico para el evento que se pasa al desencadenador. La función siguiente define un desencadenador para el proveedor específico con un esquema de Avro genérico:

private static final String schema = "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}";

@FunctionName("KafkaAvroGenericTrigger")
public void runOne(
        @KafkaTrigger(
                name = "kafkaAvroGenericSingle",
                topic = "topic",
                brokerList="%BrokerList%",
                consumerGroup="$Default",
                username = "ConfluentCloudUsername",
                password = "ConfluentCloudPassword",
                avroSchema = schema,
                authenticationMode = BrokerAuthenticationMode.PLAIN,
                protocol = BrokerProtocol.SASLSSL) Payment payment,
        final ExecutionContext context) {
    context.getLogger().info(payment.toString());
}

Para obtener un conjunto completo de ejemplos de Java en funcionamiento para Confluent, consulte el repositorio de extensiones de Kafka.

Atributos

Las bibliotecas de C# de procesos de trabajo aislados y en proceso usan KafkaTriggerAttribute para definir el desencadenador de la función.

En la tabla siguiente se explican las propiedades que se pueden establecer mediante este atributo de desencadenador:

Parámetro Descripción
BrokerList (Obligatorio) Lista de agentes de Kafka que supervisa el desencadenador. Consulte Conexiones para obtener más información.
Tema. (Obligatorio) Tema supervisado por el desencadenador.
ConsumerGroup (Opcional) Grupo de consumidores de Kafka que usa el desencadenador.
AvroSchema (Opcional) Esquema de un registro genérico al usar el protocolo de Avro.
AuthenticationMode (Opcional) Modo de autenticación cuando se usa la autenticación SASL (Nivel de seguridad y autenticación simples). Los valores admitidos son Gssapi, Plain (predeterminado), ScramSha256 y ScramSha512.
Nombre de usuario (Opcional) Nombre de usuario para la autenticación SASL. No se admite cuando AuthenticationMode es Gssapi. Consulte Conexiones para obtener más información.
Contraseña (Opcional) Contraseña para la autenticación SASL. No se admite cuando AuthenticationMode es Gssapi. Consulte Conexiones para obtener más información.
Protocolo (Opcional) Protocolo de seguridad que se usa al comunicarse con agentes. Los valores admitidos son plaintext (predeterminado), ssl, sasl_plaintext y sasl_ssl.
SslCaLocation (Opcional) Ruta de acceso al archivo de certificado de CA para comprobar el certificado del agente.
SslCertificateLocation (Opcional) Ruta de acceso al certificado del cliente.
SslKeyLocation (Opcional) Ruta de acceso a la clave privada (PEM) del cliente que se usa para la autenticación.
SslKeyPassword (Opcional) Contraseña del certificado del cliente.

anotaciones

La anotación KafkaTrigger permite crear una función que se ejecuta cuando se recibe un tema. Entre las opciones admitidas se incluyen los elementos siguientes:

Elemento Descripción
name (Obligatorio) Nombre de la variable que representa el mensaje de cola o tema en el código de la función.
brokerList (Obligatorio) Lista de agentes de Kafka que supervisa el desencadenador. Consulte Conexiones para obtener más información.
topic (Obligatorio) Tema supervisado por el desencadenador.
cardinalidad (Opcional) Indica la cardinalidad de la entrada del desencadenador. Los valores admitidos son ONE (predeterminado) y MANY. Se usa ONE cuando la entrada es un único mensaje y MANY cuando la entrada es una matriz de mensajes. Cuando se usa MANY, también se debe establecer un valor dataType.
dataType Define la manera en que Functions controla el valor del parámetro. De forma predeterminada, el valor se obtiene como una cadena y Functions intenta deserializarla en el objeto de Java antiguo sin formato (POJO) real. Cuando es string, la entrada se trata como una cadena. Cuando es binary, el mensaje se recibe como datos binarios y Functions intenta deserializarlo en bytes de tipo de parámetro real[].
consumerGroup (Opcional) Grupo de consumidores de Kafka que usa el desencadenador.
avroSchema (Opcional) Esquema de un registro genérico al usar el protocolo de Avro.
authenticationMode (Opcional) Modo de autenticación cuando se usa la autenticación SASL (Nivel de seguridad y autenticación simples). Los valores admitidos son Gssapi, Plain (predeterminado), ScramSha256 y ScramSha512.
username (Opcional) Nombre de usuario para la autenticación SASL. No se admite cuando AuthenticationMode es Gssapi. Consulte Conexiones para obtener más información.
password (Opcional) Contraseña para la autenticación SASL. No se admite cuando AuthenticationMode es Gssapi. Consulte Conexiones para obtener más información.
protocolo (Opcional) Protocolo de seguridad que se usa al comunicarse con agentes. Los valores admitidos son plaintext (predeterminado), ssl, sasl_plaintext y sasl_ssl.
sslCaLocation (Opcional) Ruta de acceso al archivo de certificado de CA para comprobar el certificado del agente.
sslCertificateLocation (Opcional) Ruta de acceso al certificado del cliente.
sslKeyLocation (Opcional) Ruta de acceso a la clave privada (PEM) del cliente que se usa para la autenticación.
sslKeyPassword (Opcional) Contraseña del certificado del cliente.

Configuración

En la siguiente tabla se explican las propiedades de configuración de enlace que se establecen en el archivo function.json.

Propiedad de function.json Descripción
type (Obligatorio) Se debe establecer en kafkaTrigger.
direction (Obligatorio) Se debe establecer en in.
name (Obligatorio) Nombre de la variable que representa los datos del agente en el código de la función.
brokerList (Obligatorio) Lista de agentes de Kafka que supervisa el desencadenador. Consulte Conexiones para obtener más información.
topic (Obligatorio) Tema supervisado por el desencadenador.
cardinalidad (Opcional) Indica la cardinalidad de la entrada del desencadenador. Los valores admitidos son ONE (predeterminado) y MANY. Se usa ONE cuando la entrada es un único mensaje y MANY cuando la entrada es una matriz de mensajes. Cuando se usa MANY, también se debe establecer un valor dataType.
dataType Define la manera en que Functions controla el valor del parámetro. De forma predeterminada, el valor se obtiene como una cadena y Functions intenta deserializarla en el objeto de Java antiguo sin formato (POJO) real. Cuando es string, la entrada se trata como una cadena. Cuando es binary, el mensaje se recibe como datos binarios y Functions intenta deserializarlo en bytes de tipo de parámetro real[].
consumerGroup (Opcional) Grupo de consumidores de Kafka que usa el desencadenador.
avroSchema (Opcional) Esquema de un registro genérico al usar el protocolo de Avro.
authenticationMode (Opcional) Modo de autenticación cuando se usa la autenticación SASL (Nivel de seguridad y autenticación simples). Los valores admitidos son Gssapi, Plain (predeterminado), ScramSha256 y ScramSha512.
username (Opcional) Nombre de usuario para la autenticación SASL. No se admite cuando AuthenticationMode es Gssapi. Consulte Conexiones para obtener más información.
password (Opcional) Contraseña para la autenticación SASL. No se admite cuando AuthenticationMode es Gssapi. Consulte Conexiones para obtener más información.
protocolo (Opcional) Protocolo de seguridad que se usa al comunicarse con agentes. Los valores admitidos son plaintext (predeterminado), ssl, sasl_plaintext y sasl_ssl.
sslCaLocation (Opcional) Ruta de acceso al archivo de certificado de CA para comprobar el certificado del agente.
sslCertificateLocation (Opcional) Ruta de acceso al certificado del cliente.
sslKeyLocation (Opcional) Ruta de acceso a la clave privada (PEM) del cliente que se usa para la autenticación.
sslKeyPassword (Opcional) Contraseña del certificado del cliente.

Uso

Los eventos de Kafka se admiten actualmente como cadenas y matrices de cadenas que son cargas JSON.

Los mensajes de Kafka se pasan a la función como cadenas y matrices de cadenas que son cargas JSON.

En un plan prémium, debe habilitar la supervisión del escalado en tiempo de ejecución para que la salida de Kafka pueda escalarse horizontalmente en varias instancias. Para obtener más información, consulte Habilitación del escalado en tiempo de ejecución.

No puede usar la característica Probar/ejecutar de la página Código y prueba de Azure Portal para trabajar con desencadenadores de Kafka. Debe enviar eventos de prueba directamente al tema supervisado por el desencadenador.

Para obtener un conjunto completo de configuraciones de host.json admitidas en el desencadenador de Kafka, consulte la configuración de host.json.

Conexiones

Toda la información de conexión que necesiten los desencadenadores y enlaces debe mantenerse en la configuración de la aplicación, y no en las definiciones de enlace del código. Lo mismo se aplica las credenciales, que nunca deben almacenarse en el código.

Importante

La configuración de credenciales debe hacer referencia a una configuración de aplicación. No codifique las credenciales de forma rígida en el código ni en los archivos de configuración. Cuando realice la ejecución de forma local, use el archivo local.settings.json para las credenciales y no publique el archivo local.settings.json.

Al conectarse a un clúster de Kafka administrado que haya proporcionado Confluent en Azure, asegúrese de que las siguientes credenciales de autenticación del entorno de Confluent Cloud están establecidas en el desencadenador o enlace:

Setting Valor recomendado Descripción
BrokerList BootstrapServer La configuración de la aplicación denominada BootstrapServer contiene el valor del servidor de arranque que se encuentra en la página de configuración de Confluent Cloud. El valor es similar a xyz-xyzxzy.westeurope.azure.confluent.cloud:9092.
Nombre de usuario ConfluentCloudUsername La configuración de la aplicación denominada ConfluentCloudUsername contiene la clave de acceso de API del sitio web de Confluent Cloud.
Contraseña ConfluentCloudPassword La configuración de la aplicación denominada ConfluentCloudPassword contiene el secreto de API obtenido en el sitio web de Confluent Cloud.

Los valores de la cadena que use en esta configuración deben estar presentes como la configuración de la aplicación en Azure o en la colección Values en el archivo local.settings.json durante el desarrollo local.

También debe establecer Protocol, AuthenticationMode y SslCaLocation en las definiciones de enlace.

Pasos siguientes