Delen via


Apache Kafka-trigger voor Azure Functions

U kunt de Apache Kafka-trigger in Azure Functions gebruiken om uw functiecode uit te voeren als reactie op berichten in Kafka-onderwerpen. U kunt ook een Kafka-uitvoerbinding gebruiken om vanuit uw functie naar een onderwerp te schrijven. Zie het overzicht van Apache Kafka-bindingen voor Azure Functions voor meer informatie over de installatie- en configuratiedetails.

Belangrijk

Kafka-bindingen zijn alleen beschikbaar voor Functions in het Elastic Premium-abonnement en het Toegewezen (App Service)-abonnement. Ze worden alleen ondersteund op versie 3.x en latere versie van de Functions-runtime.

Opmerking

Het gebruik van de trigger is afhankelijk van de C#-modaliteit die wordt gebruikt in uw functie-app. Dit kan een van de volgende modi zijn:

Een geïsoleerde werkprocesklassebibliotheek gecompileerde C#-functie wordt uitgevoerd in een proces dat is geïsoleerd van de runtime.

De kenmerken die u gebruikt, zijn afhankelijk van de specifieke gebeurtenisprovider.

In het volgende voorbeeld ziet u een C#-functie die het Kafka-bericht leest en registreert als een Kafka-gebeurtenis:

[Function("KafkaTrigger")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
    var logger = context.GetLogger("KafkaFunction");
    logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(eventData)["Value"]}");
}

Als u gebeurtenissen in een batch wilt ontvangen, gebruikt u een tekenreeksmatrix als invoer, zoals wordt weergegeven in het volgende voorbeeld:

[Function("KafkaTriggerMany")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default",
                  IsBatched = true)] string[] events, FunctionContext context)
{
    foreach (var kevent in events)
    {
        var logger = context.GetLogger("KafkaFunction");
        logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(kevent)["Value"]}");
    }

De volgende functie registreert het bericht en de headers voor de Kafka-gebeurtenis:

[Function("KafkaTriggerWithHeaders")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
    var eventJsonObject = JObject.Parse(eventData);
    var logger = context.GetLogger("KafkaFunction");
    logger.LogInformation($"C# Kafka trigger function processed a message: {eventJsonObject["Value"]}");
    var headersJArr = eventJsonObject["Headers"] as JArray;
    logger.LogInformation("Headers for this event: ");
    foreach (JObject header in headersJArr)
    {
        logger.LogInformation($"{header["Key"]} {System.Text.Encoding.UTF8.GetString((byte[])header["Value"])}");

    }
}

Zie de Kafka-extensieopslagplaats voor een volledige set werkende .NET-voorbeelden.

De specifieke eigenschappen van het function.json-bestand zijn afhankelijk van uw gebeurtenisprovider, die in deze voorbeelden Confluent of Azure Event Hubs zijn. In de volgende voorbeelden ziet u een Kafka-trigger voor een functie die een Kafka-bericht leest en registreert.

De volgende function.json definieert de trigger voor de specifieke provider:

{
    "bindings": [
        {
            "type": "kafkaTrigger",
            "name": "event",
            "direction": "in",
            "topic": "topic",
            "brokerList": "%BrokerList%",
            "username": "%ConfluentCloudUserName%",
            "password": "%ConfluentCloudPassword%",
            "protocol": "saslSsl",
            "authenticationMode": "plain",
            "consumerGroup" : "$Default",
            "dataType": "string"
        }
    ]
}

De volgende code wordt vervolgens uitgevoerd wanneer de functie wordt geactiveerd:

module.exports = async function (context, event) {
    // context.log.info(event)
    context.log.info(`JavaScript Kafka trigger function called for message ${event.Value}`);
};

Als u gebeurtenissen in een batch wilt ontvangen, stelt u de cardinality waarde many in op het function.json-bestand, zoals wordt weergegeven in de volgende voorbeelden:

{
    "bindings": [
        {
            "type": "kafkaTrigger",
            "name": "event",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%"
        }
    ]
}

De volgende code parseert vervolgens de matrix met gebeurtenissen en registreert de gebeurtenisgegevens:

module.exports = async function (context, events) {
    function print(event) {
        var eventJson = JSON.parse(event)
        context.log.info(`JavaScript Kafka trigger function called for message ${eventJson.Value}`);
    }
    events.map(print);
};

Met de volgende code worden ook de headergegevens vastgelegd:

module.exports = async function (context, event) {
  function print(kevent) {
    var keventJson = JSON.parse(kevent)
    context.log.info(`JavaScript Kafka trigger function called for message ${keventJson.Value}`);
    context.log.info(`Headers for this message:`)
    let headers =  keventJson.Headers;
    headers.forEach(element => {
        context.log.info(`Key: ${element.Key} Value:${Buffer.from(element.Value, 'base64')}`) 
    });
  }
  event.map(print);
};

U kunt een algemeen Avro-schema definiëren voor de gebeurtenis die aan de trigger is doorgegeven. De volgende function.json definieert de trigger voor de specifieke provider met een algemeen Avro-schema:

{
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaAvroGenericSingle",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "authenticationMode" : "PLAIN",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

De volgende code wordt vervolgens uitgevoerd wanneer de functie wordt geactiveerd:

module.exports = async function (context, event) {
    context.log.info(`JavaScript Kafka trigger function called for message ${JSON.stringify(event)}`);
};

Zie de Kafka-extensieopslagplaats voor een volledige set werkende JavaScript-voorbeelden.

De specifieke eigenschappen van het function.json-bestand zijn afhankelijk van uw gebeurtenisprovider, die in deze voorbeelden Confluent of Azure Event Hubs zijn. In de volgende voorbeelden ziet u een Kafka-trigger voor een functie die een Kafka-bericht leest en registreert.

De volgende function.json definieert de trigger voor de specifieke provider:

{
    "bindings": [
      {
            "type": "kafkaTrigger",
            "name": "kafkaEvent",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%",
            "sslCaLocation": "confluent_cloud_cacert.pem"
        }
    ]
}

De volgende code wordt vervolgens uitgevoerd wanneer de functie wordt geactiveerd:

using namespace System.Net

param($kafkaEvent, $TriggerMetadata)

Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"

Als u gebeurtenissen in een batch wilt ontvangen, stelt u de cardinality waarde many in op het function.json-bestand, zoals wordt weergegeven in de volgende voorbeelden:

{
    "bindings": [
      {
            "type": "kafkaTrigger",
            "name": "kafkaEvent",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%",
            "sslCaLocation": "confluent_cloud_cacert.pem"
        }
    ]
}

De volgende code parseert vervolgens de matrix met gebeurtenissen en registreert de gebeurtenisgegevens:

using namespace System.Net

param($kafkaEvents, $TriggerMetadata)

$kafkaEvents
foreach ($kafkaEvent in $kafkaEvents) {
    $event = $kafkaEvent | ConvertFrom-Json -AsHashtable
    Write-Output "Powershell Kafka trigger function called for message $event.Value"
}

Met de volgende code worden ook de headergegevens vastgelegd:

using namespace System.Net

param($kafkaEvents, $TriggerMetadata)

foreach ($kafkaEvent in $kafkaEvents) {
    $kevent = $kafkaEvent | ConvertFrom-Json -AsHashtable
    Write-Output "Powershell Kafka trigger function called for message $kevent.Value"
    Write-Output "Headers for this message:"
    foreach ($header in $kevent.Headers) {
        $DecodedValue = [System.Text.Encoding]::Unicode.GetString([System.Convert]::FromBase64String($header.Value))
        $Key = $header.Key
        Write-Output "Key: $Key Value: $DecodedValue"
    }
}

U kunt een algemeen Avro-schema definiëren voor de gebeurtenis die aan de trigger is doorgegeven. De volgende function.json definieert de trigger voor de specifieke provider met een algemeen Avro-schema:

{
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaEvent",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "authenticationMode" : "PLAIN",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

De volgende code wordt vervolgens uitgevoerd wanneer de functie wordt geactiveerd:

using namespace System.Net

param($kafkaEvent, $TriggerMetadata)

Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"

Zie de Kafka-extensieopslagplaats voor een volledige set werkende PowerShell-voorbeelden.

De specifieke eigenschappen van het function.json-bestand zijn afhankelijk van uw gebeurtenisprovider, die in deze voorbeelden Confluent of Azure Event Hubs zijn. In de volgende voorbeelden ziet u een Kafka-trigger voor een functie die een Kafka-bericht leest en registreert.

De volgende function.json definieert de trigger voor de specifieke provider:

{
      "scriptFile": "main.py",
      "bindings": [
        {
          "type": "kafkaTrigger",
          "name": "kevent",
          "topic": "topic",
          "brokerList": "%BrokerList%",
          "username": "%ConfluentCloudUserName%",
          "password": "%ConfluentCloudPassword%",
          "consumerGroup" : "functions",
          "protocol": "saslSsl",
          "authenticationMode": "plain"
        }
    ]
}

De volgende code wordt vervolgens uitgevoerd wanneer de functie wordt geactiveerd:

import logging
from azure.functions import KafkaEvent

def main(kevent : KafkaEvent):
    logging.info(kevent.get_body().decode('utf-8'))
    logging.info(kevent.metadata)

Als u gebeurtenissen in een batch wilt ontvangen, stelt u de cardinality waarde many in op het function.json-bestand, zoals wordt weergegeven in de volgende voorbeelden:

{
      "scriptFile": "main.py",
      "bindings": [
        {
            "type" : "kafkaTrigger",
            "direction": "in",
            "name" : "kevents",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "topic" : "message_python",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "dataType": "string",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "BrokerList" : "%BrokerList%"    
        }
    ]
}

De volgende code parseert vervolgens de matrix met gebeurtenissen en registreert de gebeurtenisgegevens:

import logging
import typing
from azure.functions import KafkaEvent

def main(kevents : typing.List[KafkaEvent]):
    for event in kevents:
        logging.info(event.get_body())

Met de volgende code worden ook de headergegevens vastgelegd:

import logging
import typing
from azure.functions import KafkaEvent
import json
import base64

def main(kevents : typing.List[KafkaEvent]):
    for event in kevents:
        event_dec = event.get_body().decode('utf-8')
        event_json = json.loads(event_dec)
        logging.info("Python Kafka trigger function called for message " + event_json["Value"])
        headers = event_json["Headers"]
        for header in headers:
            logging.info("Key: "+ header['Key'] + " Value: "+ str(base64.b64decode(header['Value']).decode('ascii')))

U kunt een algemeen Avro-schema definiëren voor de gebeurtenis die aan de trigger is doorgegeven. De volgende function.json definieert de trigger voor de specifieke provider met een algemeen Avro-schema:

{
  "scriptFile": "main.py",
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaTriggerAvroGeneric",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "authenticationMode" : "PLAIN",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

De volgende code wordt vervolgens uitgevoerd wanneer de functie wordt geactiveerd:

import logging
from azure.functions import KafkaEvent

def main(kafkaTriggerAvroGeneric : KafkaEvent):
    logging.info(kafkaTriggerAvroGeneric.get_body().decode('utf-8'))
    logging.info(kafkaTriggerAvroGeneric.metadata)

Zie de Kafka-extensieopslagplaats voor een volledige set werkende Python-voorbeelden.

De aantekeningen die u gebruikt om uw trigger te configureren, zijn afhankelijk van de specifieke gebeurtenisprovider.

In het volgende voorbeeld ziet u een Java-functie die de inhoud van de Kafka-gebeurtenis leest en registreert:

@FunctionName("KafkaTrigger")
public void runSingle(
        @KafkaTrigger(
            name = "KafkaTrigger",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            dataType = "string"
         ) String kafkaEventData,
        final ExecutionContext context) {
        context.getLogger().info(kafkaEventData);
}

Als u gebeurtenissen in een batch wilt ontvangen, gebruikt u een invoertekenreeks als een matrix, zoals wordt weergegeven in het volgende voorbeeld:

@FunctionName("KafkaTriggerMany")
public void runMany(
        @KafkaTrigger(
            name = "kafkaTriggerMany",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            cardinality = Cardinality.MANY,
            dataType = "string"
         ) String[] kafkaEvents,
        final ExecutionContext context) {
        for (String kevent: kafkaEvents) {
            context.getLogger().info(kevent);
        }    
}

De volgende functie registreert het bericht en de headers voor de Kafka-gebeurtenis:

@FunctionName("KafkaTriggerManyWithHeaders")
public void runSingle(
        @KafkaTrigger(
            name = "KafkaTrigger",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            dataType = "string",
            cardinality = Cardinality.MANY
         ) List<String> kafkaEvents,
        final ExecutionContext context) {
            Gson gson = new Gson(); 
            for (String keventstr: kafkaEvents) {
                KafkaEntity kevent = gson.fromJson(keventstr, KafkaEntity.class);
                context.getLogger().info("Java Kafka trigger function called for message: " + kevent.Value);
                context.getLogger().info("Headers for the message:");
                for (KafkaHeaders header : kevent.Headers) {
                    String decodedValue = new String(Base64.getDecoder().decode(header.Value));
                    context.getLogger().info("Key:" + header.Key + " Value:" + decodedValue);                    
                }                
            }
        }

U kunt een algemeen Avro-schema definiëren voor de gebeurtenis die aan de trigger is doorgegeven. De volgende functie definieert een trigger voor de specifieke provider met een algemeen Avro-schema:

private static final String schema = "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}";

@FunctionName("KafkaAvroGenericTrigger")
public void runOne(
        @KafkaTrigger(
                name = "kafkaAvroGenericSingle",
                topic = "topic",
                brokerList="%BrokerList%",
                consumerGroup="$Default",
                username = "ConfluentCloudUsername",
                password = "ConfluentCloudPassword",
                avroSchema = schema,
                authenticationMode = BrokerAuthenticationMode.PLAIN,
                protocol = BrokerProtocol.SASLSSL) Payment payment,
        final ExecutionContext context) {
    context.getLogger().info(payment.toString());
}

Zie de Kafka-extensieopslagplaats voor een volledige set werkende Java-voorbeelden voor Confluent.

Kenmerken

C #-bibliotheken voor zowel in-process - als geïsoleerde werkprocessen maken gebruik van de KafkaTriggerAttribute functietrigger om de functietrigger te definiëren.

In de volgende tabel worden de eigenschappen uitgelegd die u kunt instellen met behulp van dit triggerkenmerk:

Parameter Description
BrokerList (Vereist) De lijst met Kafka-brokers die worden bewaakt door de trigger. Zie Verbindingen voor meer informatie.
Onderwerp (Vereist) Het onderwerp dat wordt bewaakt door de trigger.
ConsumerGroup (Optioneel) Kafka-consumentengroep die door de trigger wordt gebruikt.
AvroSchema (Optioneel) Schema van een algemene record bij gebruik van het Avro-protocol.
AuthenticationMode (Optioneel) De verificatiemodus bij het gebruik van SASL-verificatie (Simple Authentication and Security Layer). De ondersteunde waarden zijn Gssapi, Plain (standaard), ScramSha256. ScramSha512
Gebruikersnaam (Optioneel) De gebruikersnaam voor SASL-verificatie. Niet ondersteund wanneer AuthenticationMode .Gssapi Zie Verbindingen voor meer informatie.
Wachtwoord (Optioneel) Het wachtwoord voor SASL-verificatie. Niet ondersteund wanneer AuthenticationMode .Gssapi Zie Verbindingen voor meer informatie.
Protocol (Optioneel) Het beveiligingsprotocol dat wordt gebruikt bij het communiceren met brokers. De ondersteunde waarden zijn plaintext (standaard), ssl, . sasl_sslsasl_plaintext
SslCaLocation (Optioneel) Pad naar ca-certificaatbestand voor het verifiëren van het certificaat van de broker.
SslCertificateLocation (Optioneel) Pad naar het certificaat van de client.
SslKeyLocation (Optioneel) Pad naar de persoonlijke sleutel van de client (PEM) die wordt gebruikt voor verificatie.
SslKeyPassword (Optioneel) Wachtwoord voor het certificaat van de client.

Aantekeningen

Met de KafkaTrigger aantekening kunt u een functie maken die wordt uitgevoerd wanneer een onderwerp wordt ontvangen. Ondersteunde opties omvatten de volgende elementen:

Element Description
name (Vereist) De naam van de variabele die de wachtrij of het onderwerpbericht in functiecode vertegenwoordigt.
brokerList (Vereist) De lijst met Kafka-brokers die worden bewaakt door de trigger. Zie Verbindingen voor meer informatie.
onderwerp (Vereist) Het onderwerp dat wordt bewaakt door de trigger.
cardinality (Optioneel) Geeft de kardinaliteit van de triggerinvoer aan. De ondersteunde waarden zijn ONE (standaard) en MANY. Gebruik ONE wanneer de invoer één bericht is en MANY wanneer de invoer een matrix van berichten is. Wanneer u gebruikt MANY, moet u ook een dataType.
Datatype Definieert hoe Functions de parameterwaarde verwerkt. De waarde wordt standaard verkregen als een tekenreeks en Functions probeert de tekenreeks te deserialiseren naar een echt normaal oud Java-object (POJO). Wanneer string, wordt de invoer behandeld als alleen een tekenreeks. Wanneer binary, het bericht wordt ontvangen als binaire gegevens en Functions probeert het te deserialiseren naar een werkelijke parametertype byte[].
consumerGroup (Optioneel) Kafka-consumentengroep die door de trigger wordt gebruikt.
avroSchema (Optioneel) Schema van een algemene record bij gebruik van het Avro-protocol.
authenticationMode (Optioneel) De verificatiemodus bij het gebruik van SASL-verificatie (Simple Authentication and Security Layer). De ondersteunde waarden zijn Gssapi, Plain (standaard), ScramSha256. ScramSha512
gebruikersnaam (Optioneel) De gebruikersnaam voor SASL-verificatie. Niet ondersteund wanneer AuthenticationMode .Gssapi Zie Verbindingen voor meer informatie.
password (Optioneel) Het wachtwoord voor SASL-verificatie. Niet ondersteund wanneer AuthenticationMode .Gssapi Zie Verbindingen voor meer informatie.
protocol (Optioneel) Het beveiligingsprotocol dat wordt gebruikt bij het communiceren met brokers. De ondersteunde waarden zijn plaintext (standaard), ssl, . sasl_sslsasl_plaintext
sslCaLocation (Optioneel) Pad naar ca-certificaatbestand voor het verifiëren van het certificaat van de broker.
sslCertificateLocation (Optioneel) Pad naar het certificaat van de client.
sslKeyLocation (Optioneel) Pad naar de persoonlijke sleutel van de client (PEM) die wordt gebruikt voor verificatie.
sslKeyPassword (Optioneel) Wachtwoord voor het certificaat van de client.

Configuratie

In de volgende tabel worden de bindingsconfiguratie-eigenschappen uitgelegd die u in het function.json-bestand hebt ingesteld.

eigenschap function.json Beschrijving
type (Vereist) Moet worden ingesteld op kafkaTrigger.
direction (Vereist) Moet worden ingesteld op in.
name (Vereist) De naam van de variabele die de brokergegevens in functiecode vertegenwoordigt.
brokerList (Vereist) De lijst met Kafka-brokers die worden bewaakt door de trigger. Zie Verbindingen voor meer informatie.
onderwerp (Vereist) Het onderwerp dat wordt bewaakt door de trigger.
cardinality (Optioneel) Geeft de kardinaliteit van de triggerinvoer aan. De ondersteunde waarden zijn ONE (standaard) en MANY. Gebruik ONE wanneer de invoer één bericht is en MANY wanneer de invoer een matrix van berichten is. Wanneer u gebruikt MANY, moet u ook een dataType.
Datatype Definieert hoe Functions de parameterwaarde verwerkt. De waarde wordt standaard verkregen als een tekenreeks en Functions probeert de tekenreeks te deserialiseren naar een echt normaal oud Java-object (POJO). Wanneer string, wordt de invoer behandeld als alleen een tekenreeks. Wanneer binary, het bericht wordt ontvangen als binaire gegevens en Functions probeert het te deserialiseren naar een werkelijke parametertype byte[].
consumerGroup (Optioneel) Kafka-consumentengroep die door de trigger wordt gebruikt.
avroSchema (Optioneel) Schema van een algemene record bij gebruik van het Avro-protocol.
authenticationMode (Optioneel) De verificatiemodus bij het gebruik van SASL-verificatie (Simple Authentication and Security Layer). De ondersteunde waarden zijn Gssapi, Plain (standaard), ScramSha256. ScramSha512
gebruikersnaam (Optioneel) De gebruikersnaam voor SASL-verificatie. Niet ondersteund wanneer AuthenticationMode .Gssapi Zie Verbindingen voor meer informatie.
password (Optioneel) Het wachtwoord voor SASL-verificatie. Niet ondersteund wanneer AuthenticationMode .Gssapi Zie Verbindingen voor meer informatie.
protocol (Optioneel) Het beveiligingsprotocol dat wordt gebruikt bij het communiceren met brokers. De ondersteunde waarden zijn plaintext (standaard), ssl, . sasl_sslsasl_plaintext
sslCaLocation (Optioneel) Pad naar ca-certificaatbestand voor het verifiëren van het certificaat van de broker.
sslCertificateLocation (Optioneel) Pad naar het certificaat van de client.
sslKeyLocation (Optioneel) Pad naar de persoonlijke sleutel van de client (PEM) die wordt gebruikt voor verificatie.
sslKeyPassword (Optioneel) Wachtwoord voor het certificaat van de client.

Gebruik

Kafka-gebeurtenissen worden momenteel ondersteund als tekenreeksen en tekenreeksmatrices die JSON-nettoladingen zijn.

Kafka-berichten worden doorgegeven aan de functie als tekenreeksen en tekenreeksmatrices die JSON-nettoladingen zijn.

In een Premium-abonnement moet u runtimeschaalbewaking inschakelen voor de Kafka-uitvoer om uit te kunnen schalen naar meerdere exemplaren. Zie Schalen van runtime inschakelen voor meer informatie.

U kunt de functie Testen/uitvoeren van de pagina Code en test in Azure Portal niet gebruiken om te werken met Kafka-triggers. U moet in plaats daarvan testgebeurtenissen rechtstreeks verzenden naar het onderwerp dat wordt bewaakt door de trigger.

Zie host.json instellingen voor een volledige set ondersteunde host.json instellingen voor de Kafka-trigger.

Connecties

Alle verbindingsgegevens die vereist zijn voor uw triggers en bindingen, moeten worden onderhouden in toepassingsinstellingen en niet in de bindingsdefinities in uw code. Dit geldt voor referenties, die nooit in uw code moeten worden opgeslagen.

Belangrijk

Referentie-instellingen moeten verwijzen naar een toepassingsinstelling. Gebruik geen codereferenties in uw code- of configuratiebestanden. Wanneer u lokaal werkt, gebruikt u het local.settings.json-bestand voor uw referenties en publiceert u het local.settings.json bestand niet.

Wanneer u verbinding maakt met een beheerd Kafka-cluster dat wordt geleverd door Confluent in Azure, moet u ervoor zorgen dat de volgende verificatiereferenties voor uw Confluent Cloud-omgeving zijn ingesteld in uw trigger of binding:

Instelling Aanbevolen waarde Beschrijving
BrokerList BootstrapServer De app-instelling met de naam BootstrapServer bevat de waarde van de bootstrap-server die is gevonden op de pagina met Confluent Cloud-instellingen. De waarde lijkt op xyz-xyzxzy.westeurope.azure.confluent.cloud:9092.
Gebruikersnaam ConfluentCloudUsername App-instelling met de naam ConfluentCloudUsername bevat de API-toegangssleutel van de Confluent Cloud-website.
Wachtwoord ConfluentCloudPassword App-instelling met de naam ConfluentCloudPassword bevat het API-geheim dat is verkregen van de Confluent Cloud-website.

De tekenreekswaarden die u voor deze instellingen gebruikt, moeten aanwezig zijn als toepassingsinstellingen in Azure of in de Values verzameling in het local.settings.json-bestand tijdens lokale ontwikkeling.

U moet ook de Protocol, AuthenticationModeen SslCaLocation in uw bindingsdefinities instellen.

Volgende stappen