Compartir vía


Información general de los enlaces de Apache Kafka para Azure Functions

La extensión de Kafka para Azure Functions permite escribir valores en temas de Apache Kafka mediante un enlace de salida. También puede usar un desencadenador para invocar las funciones en respuesta a los mensajes de los temas de Kafka.

Importante

Los enlaces de Kafka solo están disponibles para las instancias de Functions del plan elástico prémium y del plan dedicado (App Service). Solo se admiten en la versión 3.x y posteriores del entorno de ejecución de Functions.

Acción Tipo
Ejecute una función basada en un nuevo evento de Kafka. Desencadenador
Escriba en la secuencia de eventos de Kafka. Enlace de salida

Instalación de la extensión

El paquete NuGet de la extensión que instale depende del modo de C# que esté usando en la aplicación de funciones:

Las funciones se ejecutan en un proceso de trabajo de C# aislado. Para más información, consulte Guía para ejecutar C# Azure Functions en un proceso de trabajo aislado.

Para agregar la extensión al proyecto, instale este paquete NuGet.

Instalación del conjunto

La extensión de Kafka forma parte de un conjunto de extensiones, que se especifica en el archivo de proyecto host.json. Al crear un proyecto que tenga como destino la versión 3.x o posterior de Functions, ya debería tener instalado este conjunto. Para obtener más información, consulte Conjuntos de extensiones.

Habilitación del escalado de entorno de ejecución

Para permitir que las funciones se escalen correctamente en el plan Premium al usar desencadenadores y enlaces de Kafka, debe habilitar la supervisión del escalado del entorno de ejecución.

En Azure Portal, en la aplicación de funciones, elija Configuración y, en la pestaña Configuración del entorno de ejecución de la función, cambie la Supervisión de la escala en tiempo de ejecución a Activo.

Captura de pantalla del panel Azure Portal para habilitar el escalado en tiempo de ejecución

configuración de host.json

En esta sección se describen las opciones de configuración disponibles para este enlace en las versiones 3.x y posteriores. La configuración del archivo host.json se aplica a todas las funciones de una instancia de la aplicación de funciones. Para más información sobre la configuración de la aplicación de funciones en las versiones 3.x y posteriores, consulte la referencia de host.json para Azure Functions.

{
    "version": "2.0",
    "extensions": {
        "kafka": {
            "maxBatchSize": 64,
            "SubscriberIntervalInSeconds": 1,
            "ExecutorChannelCapacity": 1,
            "ChannelFullRetryIntervalInMs": 50
        }
    }
}

Propiedad Valor predeterminado Tipo Descripción
ChannelFullRetryIntervalInMs 50 Desencadenador Define el intervalo de reintento del suscriptor, en milisegundos, que se usa al intentar agregar elementos a un canal de capacidad.
ExecutorChannelCapacity 1 Ambos Define la capacidad del mensaje del canal. Una vez alcanzada la capacidad, el suscriptor de Kafka se detiene hasta que la función se pone al día.
MaxBatchSize 64 Desencadenador Tamaño máximo de lote al llamar a una función desencadenada por Kafka.
SubscriberIntervalInSeconds 1 Desencadenador Define la frecuencia mínima con la que se ejecutan los mensajes entrantes, por función en segundos. Solo cuando el volumen del mensaje es menor que MaxBatchSize / SubscriberIntervalInSeconds.

Las siguientes propiedades, que se heredan de la biblioteca cliente de C/C++ de Apache Kafka, también se admiten en la sección kafka de host.json, para desencadenadores o enlaces de salida y desencadenadores:

Propiedad Se aplica a Equivalente a librdkafka
AutoCommitIntervalMs Desencadenador auto.commit.interval.ms
AutoOffsetReset Desencadenador auto.offset.reset
FetchMaxBytes Desencadenador fetch.max.bytes
LibkafkaDebug Ambos debug
MaxPartitionFetchBytes Desencadenador max.partition.fetch.bytes
MaxPollIntervalMs Desencadenador max.poll.interval.ms
MetadataMaxAgeMs Ambos metadata.max.age.ms
QueuedMinMessages Desencadenador queued.min.messages
QueuedMaxMessagesKbytes Desencadenador queued.max.messages.kbytes
ReconnectBackoffMs Desencadenador reconnect.backoff.max.ms
ReconnectBackoffMaxMs Desencadenador reconnect.backoff.max.ms
SessionTimeoutMs Desencadenador session.timeout.ms
SocketKeepaliveEnable Ambos socket.keepalive.enable
StatisticsIntervalMs Desencadenador statistics.interval.ms

Pasos siguientes