Freigeben über


Apache Kafka-Ausgabebindung für Azure Functions

Mit der Ausgabebindung kann eine Azure Functions-App Nachrichten in ein Kafka-Thema schreiben.

Wichtig

Kafka-Bindungen sind für Functions nur im Elastic Premium-Plan im Dedicated-Plan (App Service) verfügbar. Sie werden nur in Version 3.x und höher der Functions-Laufzeit unterstützt.

Beispiel

Die Verwendung der Bindung hängt von der C#-Modalität ab, die in Ihrer Funktions-App verwendet wird. Dies kann eine der folgenden Modalitäten sein:

Eine Klassenbibliothek in einem isolierten Workerprozess ist eine kompilierte C#-Funktion, die in einem von der Runtime isolierten Prozess ausgeführt wird.

Die von Ihnen verwendeten Attribute hängen vom jeweiligen Ereignisanbieter ab.

Im folgenden Beispiel ist ein benutzerdefinierter Rückgabetyp MultipleOutputType vorhanden, der aus einer HTTP-Antwort und einer Kafka-Ausgabe besteht.

[Function("KafkaOutput")]

public static MultipleOutputType Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");

    string message = req.FunctionContext
                        .BindingContext
                        .BindingData["message"]
                        .ToString();

    var response = req.CreateResponse(HttpStatusCode.OK);
    return new MultipleOutputType()
    {
        Kevent = message,
        HttpResponse = response
    };
}

In der MultipleOutputType-Klasse ist Kevent die Ausgabebindungsvariable für die Kafka-Bindung.

public class MultipleOutputType
{
    [KafkaOutput("BrokerList",
                "topic",
                Username = "ConfluentCloudUserName",
                Password = "ConfluentCloudPassword",
        Protocol = BrokerProtocol.SaslSsl,
        AuthenticationMode = BrokerAuthenticationMode.Plain
    )]        
    public string Kevent { get; set; }

    public HttpResponseData HttpResponse { get; set; }
}

Um einen Batch von Ereignissen zu senden, übergeben Sie ein Zeichenfolgenarray an den Ausgabetyp, wie im folgenden Beispiel gezeigt:

[Function("KafkaOutputMany")]

public static MultipleOutputTypeForBatch Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");
    var response = req.CreateResponse(HttpStatusCode.OK);

    string[] messages = new string[2];
    messages[0] = "one";
    messages[1] = "two";

    return new MultipleOutputTypeForBatch()
    {
        Kevents = messages,
        HttpResponse = response
    };
}

Das Zeichenfolgenarray wird als Kevents-Eigenschaft für die Klasse definiert, für die die Ausgabebindung definiert ist:

public class MultipleOutputTypeForBatch
{
    [KafkaOutput("BrokerList",
                 "topic",
                 Username = "ConfluentCloudUserName",
                 Password = "ConfluentCloudPassword",
        Protocol = BrokerProtocol.SaslSsl,
        AuthenticationMode = BrokerAuthenticationMode.Plain
    )]        
    public string[] Kevents { get; set; }

    public HttpResponseData HttpResponse { get; set; }
}

Die folgende Funktion fügt den Kafka-Ausgabedaten Header hinzu:

[Function("KafkaOutputWithHeaders")]

public static MultipleOutputType Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");

    string message = req.FunctionContext
                        .BindingContext
                        .BindingData["message"]
                        .ToString();
    string kevent = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"dotnet-isolated\" }] }";
    var response = req.CreateResponse(HttpStatusCode.OK);
    return new MultipleOutputType()
    {
        Kevent = kevent,
        HttpResponse = response
    };
}

Eine vollständige Sammlung funktionierender .NET-Beispiele finden Sie im Kafka-Erweiterungsrepository.

Hinweis

Eine entsprechende Sammlung von TypeScript-Beispielen finden Sie im Kafka-Erweiterungsrepository.

Die spezifischen Eigenschaften der Datei „function.json“ hängen von Ihrem Ereignisanbieter ab, der in diesen Beispielen entweder Confluent oder Azure Event Hubs ist. Die folgenden Beispiele zeigen eine Kafka-Ausgabebindung für eine Funktion, die von einer HTTP-Anforderung ausgelöst wird und Daten aus der Anforderung an das Kafka-Thema sendet.

Die folgende Datei „function.json“ definiert den Trigger für den spezifischen Anbieter in diesen Beispielen:

{
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "name": "outputKafkaMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username": "ConfluentCloudUsername",
      "password": "ConfluentCloudPassword",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN",
      "direction": "out"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "res"
    }
  ]
}

Der folgende Code sendet dann eine Nachricht an das Thema:

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');

    const message = (req.query.message);
    context.bindings.outputKafkaMessage = message;
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: 'Ok'
    };
}

Der folgende Code sendet mehrere Nachrichten als Array an dasselbe Thema:

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');
    
    context.bindings.outputKafkaMessages = ["one", "two"];
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: responseMessage
    };
}

Das folgende Beispiel zeigt, wie eine Ereignisnachricht mit Headern an dasselbe Kafka-Thema gesendet wird:

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');

    const message = (req.query.message || (req.body && req.body.message));
    const responseMessage = message
        ? "Message received: " + message + ". The message transfered to the kafka broker."
        : "This HTTP triggered function executed successfully. Pass a message in the query string or in the request body for a personalized response.";
    context.bindings.outputKafkaMessage = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"javascript\" }] }"
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: responseMessage
    };
}

Eine vollständige Sammlung funktionierender JavaScript-Beispiele finden Sie im Kafka-Erweiterungsrepository.

Die spezifischen Eigenschaften der Datei „function.json“ hängen von Ihrem Ereignisanbieter ab, der in diesen Beispielen entweder Confluent oder Azure Event Hubs ist. Die folgenden Beispiele zeigen eine Kafka-Ausgabebindung für eine Funktion, die von einer HTTP-Anforderung ausgelöst wird und Daten aus der Anforderung an das Kafka-Thema sendet.

Die folgende Datei „function.json“ definiert den Trigger für den spezifischen Anbieter in diesen Beispielen:

{
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "Request",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "name": "outputMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username" : "%ConfluentCloudUserName%",
      "password" : "%ConfluentCloudPassword%",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN",
      "direction": "out"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "Response"
    }
  ]
}

Der folgende Code sendet dann eine Nachricht an das Thema:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

# Interact with query parameters or the body of the request.
$message = $Request.Query.Message

$message

Push-OutputBinding -Name outputMessage -Value ($message)

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
})

Der folgende Code sendet mehrere Nachrichten als Array an dasselbe Thema:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

$message = @("one", "two")
Push-OutputBinding -Name outputMessage -Value ($message)

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
})

Das folgende Beispiel zeigt, wie eine Ereignisnachricht mit Headern an dasselbe Kafka-Thema gesendet wird:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
if (-not $message) {
    $message = $Request.Body.Message
}

$kevent = @{
    Offset = 364
    Partition = 0
    Topic = "kafkaeventhubtest1"
    Timestamp = "2022-04-09T03:20:06.591Z"
    Value = $message
    Headers= @(@{
        Key= "test"
        Value= "powershell"
    }
    )
}

Push-OutputBinding -Name Message -Value $kevent

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
    Body = 'ok'
})

Eine vollständige Sammlung funktionierender PowerShell-Beispiele finden Sie im Kafka-Erweiterungsrepository.

Die spezifischen Eigenschaften der Datei „function.json“ hängen von Ihrem Ereignisanbieter ab, der in diesen Beispielen entweder Confluent oder Azure Event Hubs ist. Die folgenden Beispiele zeigen eine Kafka-Ausgabebindung für eine Funktion, die von einer HTTP-Anforderung ausgelöst wird und Daten aus der Anforderung an das Kafka-Thema sendet.

Die folgende Datei „function.json“ definiert den Trigger für den spezifischen Anbieter in diesen Beispielen:

{
  "scriptFile": "main.py",
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "direction": "out",
      "name": "outputMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username": "%ConfluentCloudUserName%",
      "password": "%ConfluentCloudPassword%",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "$return"
    }
  ]
}

Der folgende Code sendet dann eine Nachricht an das Thema:

import logging

import azure.functions as func


def main(req: func.HttpRequest, outputMessage: func.Out[str]) -> func.HttpResponse:
    input_msg = req.params.get('message')
    outputMessage.set(input_msg)
    return 'OK'

Der folgende Code sendet mehrere Nachrichten als Array an dasselbe Thema:

import logging
import typing
from azure.functions import Out, HttpRequest, HttpResponse
import json

def main(req: HttpRequest, outputMessage: Out[str] ) -> HttpResponse:
    outputMessage.set(['one', 'two'])
    return 'OK'

Das folgende Beispiel zeigt, wie eine Ereignisnachricht mit Headern an dasselbe Kafka-Thema gesendet wird:

import logging

import azure.functions as func
import json

def main(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
    message = req.params.get('message')
    kevent =  { "Offset":364,"Partition":0,"Topic":"kafkaeventhubtest1","Timestamp":"2022-04-09T03:20:06.591Z", "Value": message, "Headers": [{ "Key": "test", "Value": "python" }] }
    out.set(json.dumps(kevent))
    return 'OK'

Eine vollständige Sammlung funktionierender Python-Beispiele finden Sie im Kafka-Erweiterungsrepository.

Die Anmerkungen, die Sie zum Konfigurieren der Ausgabebindung verwenden, hängen vom jeweiligen Ereignisanbieter ab.

Die folgende Funktion sendet eine Nachricht an das Kafka-Thema.

@FunctionName("KafkaOutput")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<String> output,
        final ExecutionContext context) {
    context.getLogger().info("Java HTTP trigger processed a request.");

    // Parse query parameter
    String query = request.getQueryParameters().get("message");
    String message = request.getBody().orElse(query);
    context.getLogger().info("Message:" + message);
    output.setValue(message);
    return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();

Das folgende Beispiel zeigt, wie mehrere Nachrichten an ein Kafka-Thema gesendet werden.

@FunctionName("KafkaOutputMany")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<String[]> output,
        final ExecutionContext context) {
    context.getLogger().info("Java HTTP trigger processed a request.");
    String[] messages = new String[2];
    messages[0] = "one";
    messages[1] = "two";
    output.setValue(messages);
    return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}

In diesem Beispiel wird der Ausgabebindungsparameter in ein Zeichenfolgenarray geändert.

Im letzten Beispiel werden diese KafkaEntity- und KafkaHeader-Klassen verwendet:

public class KafkaEntity {
    public int Offset;
    public int Partition;
    public String Timestamp;
    public String Topic;
    public String Value;
    public KafkaHeaders Headers[];

    public KafkaEntity(int Offset, int Partition, String Topic, String Timestamp, String Value,KafkaHeaders[] headers) {
        this.Offset = Offset;
        this.Partition = Partition;
        this.Topic = Topic;
        this.Timestamp = Timestamp;
        this.Value = Value;
        this.Headers = headers;
    }
public class KafkaHeaders{
    public String Key;
    public String Value;

    public KafkaHeaders(String key, String value) {
        this.Key = key;
        this.Value = value;
    }

Die folgende Beispielfunktion sendet eine Nachricht mit Headern an ein Kafka-Thema.

@FunctionName("KafkaOutputWithHeaders")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<KafkaEntity> output,
        final ExecutionContext context) {
            context.getLogger().info("Java HTTP trigger processed a request.");
    
            // Parse query parameter
            String query = request.getQueryParameters().get("message");
            String message = request.getBody().orElse(query);
            KafkaHeaders[] headers = new KafkaHeaders[1];
            headers[0] = new KafkaHeaders("test", "java");
            KafkaEntity kevent = new KafkaEntity(364, 0, "topic", "2022-04-09T03:20:06.591Z", message, headers);
            output.setValue(kevent);
            return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
        }

Eine vollständige Sammlung funktionierender Java-Beispiele für Confluent finden Sie im Kafka-Erweiterungsrepository.

Attribute

Von C#-Bibliotheken des Typs In-Process und isolierter Workerprozess wird das Attribut Kafka verwendet, um die Funktion zu definieren.

In der folgenden Tabelle werden die Eigenschaften erläutert, die mithilfe dieses Attributs festgelegt werden können:

Parameter BESCHREIBUNG
BrokerList (Erforderlich) Die Liste der Kafka-Broker, an die die Ausgabe gesendet wird. Weitere Informationen finden Sie unter Verbindungen.
Thema (Erforderlich) Das Thema, an das die Ausgabe gesendet wird.
AvroSchema (Optional) Das Schema eines generischen Datensatzes, wenn das Avro-Protokoll verwendet wird.
MaxMessageBytes (Optional) Die maximale Größe der gesendeten Ausgabenachricht (in MB) mit einem Standardwert von 1.
BatchSize (Optional) Die maximale Anzahl von Nachrichten, die in einem einzelnen Nachrichtensatz mit einem Standardwert von 10000 als Batch zusammengefasst werden.
EnableIdempotence (Optional) Wenn diese Einstellung auf true festgelegt ist, wird garantiert, dass Nachrichten genau ein Mal und in der ursprünglichen Generierungsreihenfolge mit einem Standardwert von false erfolgreich generiert werden.
MessageTimeoutMs (Optional) Das lokale Nachrichtentimeout in Millisekunden. Dieser Wert wird nur lokal erzwungen und begrenzt die Zeit, die eine generierte Nachricht mit einem Standardwert von 300000 auf ihre erfolgreiche Übermittlung wartet. Eine Zeitangabe von 0 ist unendlich. Dieser Wert ist die maximale Zeit für die Übermittlung einer Nachricht (einschließlich Wiederholungsversuche). Ein Übermittlungsfehler tritt auf, wenn entweder die Anzahl der Wiederholungsversuche oder das Nachrichtentimeout überschritten wird.
RequestTimeoutMs (Optional) Das Bestätigungstimeout der Ausgabeanforderung in Millisekunden mit einem Standardwert von 5000.
MaxRetries (Optional) Die Anzahl der Wiederholungsversuche zum Senden einer fehlgeschlagenen Nachricht mit einem Standardwert von 2. Wiederholungsversuche können dazu führen, dass sich die Reihenfolge ändert, es sei denn, EnableIdempotence ist auf true festgelegt.
AuthenticationMode (Optional) Der Authentifizierungsmodus bei Verwendung der SASL-Authentifizierung (Simple Authentication and Security Layer). Unterstützt werden die Werte Gssapi, Plain (Standardwert), ScramSha256 und ScramSha512.
Benutzername (Optional) Der Benutzername für die SASL-Authentifizierung. Wird nicht unterstützt, wenn AuthenticationMode den Wert Gssapi aufweist. Weitere Informationen finden Sie unter Verbindungen.
Kennwort (Optional) Das Kennwort für die SASL-Authentifizierung. Wird nicht unterstützt, wenn AuthenticationMode den Wert Gssapi aufweist. Weitere Informationen finden Sie unter Verbindungen.
Protokoll (Optional) Das Sicherheitsprotokoll, das beim Kommunizieren mit Brokern verwendet wird. Unterstützt werden die Werte plaintext (Standardwert), ssl, sasl_plaintext und sasl_ssl.
SslCaLocation (Optional) Der Pfad zur Zertifikatdatei der Zertifizierungsstelle zum Überprüfen des Zertifikats des Brokers.
SslCertificateLocation (Optional) Der Pfad zum Zertifikat des Clients.
SslKeyLocation (Optional) Der Pfad zum privaten Schlüssel (PEM) des Clients, der für die Authentifizierung verwendet wird.
SslKeyPassword (Optional) Das Kennwort für das Zertifikat des Clients.

Anmerkungen

Mit der KafkaOutput-Anmerkung können Sie eine Funktion erstellen, die in ein bestimmtes Thema schreibt. Unterstützte Optionen umfassen die folgenden Elemente:

Element BESCHREIBUNG
name Der Name der Variablen, die die Brokerdaten im Funktionscode darstellt.
brokerList (Erforderlich) Die Liste der Kafka-Broker, an die die Ausgabe gesendet wird. Weitere Informationen finden Sie unter Verbindungen.
topic (Erforderlich) Das Thema, an das die Ausgabe gesendet wird.
dataType Definiert, wie Functions den Parameterwert verarbeitet. Standardmäßig wird der Wert als Zeichenfolge abgerufen, und Functions versucht, die Zeichenfolge in das tatsächliche POJO (Plain-Old Java Object) zu deserialisieren. Bei string wird die Eingabe nur als Zeichenfolge behandelt. Wenn binary, wird die Nachricht als Binärdaten empfangen, und Functions versucht, sie in einen tatsächlichen Parametertyp byte[] zu deserialisieren.
avroSchema (Optional) Das Schema eines generischen Datensatzes, wenn das Avro-Protokoll verwendet wird. (Derzeit nicht für Java unterstützt.)
maxMessageBytes (Optional) Die maximale Größe der gesendeten Ausgabenachricht (in MB) mit einem Standardwert von 1.
batchSize (Optional) Die maximale Anzahl von Nachrichten, die in einem einzelnen Nachrichtensatz mit einem Standardwert von 10000 als Batch zusammengefasst werden.
enableIdempotence (Optional) Wenn diese Einstellung auf true festgelegt ist, wird garantiert, dass Nachrichten genau ein Mal und in der ursprünglichen Generierungsreihenfolge mit einem Standardwert von false erfolgreich generiert werden.
messageTimeoutMs (Optional) Das lokale Nachrichtentimeout in Millisekunden. Dieser Wert wird nur lokal erzwungen und begrenzt die Zeit, die eine generierte Nachricht mit einem Standardwert von 300000 auf ihre erfolgreiche Übermittlung wartet. Eine Zeitangabe von 0 ist unendlich. Dies ist die maximale Zeit für die Übermittlung einer Nachricht (einschließlich Wiederholungsversuche). Ein Übermittlungsfehler tritt auf, wenn entweder die Anzahl der Wiederholungsversuche oder das Nachrichtentimeout überschritten wird.
requestTimeoutMs (Optional) Das Bestätigungstimeout der Ausgabeanforderung in Millisekunden mit einem Standardwert von 5000.
maxRetries (Optional) Die Anzahl der Wiederholungsversuche zum Senden einer fehlgeschlagenen Nachricht mit einem Standardwert von 2. Wiederholungsversuche können dazu führen, dass sich die Reihenfolge ändert, es sei denn, EnableIdempotence ist auf true festgelegt.
authenticationMode (Optional) Der Authentifizierungsmodus bei Verwendung der SASL-Authentifizierung (Simple Authentication and Security Layer). Unterstützt werden die Werte Gssapi, Plain (Standardwert), ScramSha256 und ScramSha512.
username (Optional) Der Benutzername für die SASL-Authentifizierung. Wird nicht unterstützt, wenn AuthenticationMode den Wert Gssapi aufweist. Weitere Informationen finden Sie unter Verbindungen.
password (Optional) Das Kennwort für die SASL-Authentifizierung. Wird nicht unterstützt, wenn AuthenticationMode den Wert Gssapi aufweist. Weitere Informationen finden Sie unter Verbindungen.
protocol (Optional) Das Sicherheitsprotokoll, das beim Kommunizieren mit Brokern verwendet wird. Unterstützt werden die Werte plaintext (Standardwert), ssl, sasl_plaintext und sasl_ssl.
sslCaLocation (Optional) Der Pfad zur Zertifikatdatei der Zertifizierungsstelle zum Überprüfen des Zertifikats des Brokers.
sslCertificateLocation (Optional) Der Pfad zum Zertifikat des Clients.
sslKeyLocation (Optional) Der Pfad zum privaten Schlüssel (PEM) des Clients, der für die Authentifizierung verwendet wird.
sslKeyPassword (Optional) Das Kennwort für das Zertifikat des Clients.

Konfiguration

Die folgende Tabelle gibt Aufschluss über die Bindungskonfigurationseigenschaften, die Sie in der Datei function.json festlegen.

function.json-Eigenschaft BESCHREIBUNG
type Muss auf kafka festgelegt sein.
direction Muss auf out festgelegt sein.
name Der Name der Variablen, die die Brokerdaten im Funktionscode darstellt.
brokerList (Erforderlich) Die Liste der Kafka-Broker, an die die Ausgabe gesendet wird. Weitere Informationen finden Sie unter Verbindungen.
topic (Erforderlich) Das Thema, an das die Ausgabe gesendet wird.
avroSchema (Optional) Das Schema eines generischen Datensatzes, wenn das Avro-Protokoll verwendet wird.
maxMessageBytes (Optional) Die maximale Größe der gesendeten Ausgabenachricht (in MB) mit einem Standardwert von 1.
batchSize (Optional) Die maximale Anzahl von Nachrichten, die in einem einzelnen Nachrichtensatz mit einem Standardwert von 10000 als Batch zusammengefasst werden.
enableIdempotence (Optional) Wenn diese Einstellung auf true festgelegt ist, wird garantiert, dass Nachrichten genau ein Mal und in der ursprünglichen Generierungsreihenfolge mit einem Standardwert von false erfolgreich generiert werden.
messageTimeoutMs (Optional) Das lokale Nachrichtentimeout in Millisekunden. Dieser Wert wird nur lokal erzwungen und begrenzt die Zeit, die eine generierte Nachricht mit einem Standardwert von 300000 auf ihre erfolgreiche Übermittlung wartet. Eine Zeitangabe von 0 ist unendlich. Dies ist die maximale Zeit für die Übermittlung einer Nachricht (einschließlich Wiederholungsversuche). Ein Übermittlungsfehler tritt auf, wenn entweder die Anzahl der Wiederholungsversuche oder das Nachrichtentimeout überschritten wird.
requestTimeoutMs (Optional) Das Bestätigungstimeout der Ausgabeanforderung in Millisekunden mit einem Standardwert von 5000.
maxRetries (Optional) Die Anzahl der Wiederholungsversuche zum Senden einer fehlgeschlagenen Nachricht mit einem Standardwert von 2. Wiederholungsversuche können dazu führen, dass sich die Reihenfolge ändert, es sei denn, EnableIdempotence ist auf true festgelegt.
authenticationMode (Optional) Der Authentifizierungsmodus bei Verwendung der SASL-Authentifizierung (Simple Authentication and Security Layer). Unterstützt werden die Werte Gssapi, Plain (Standardwert), ScramSha256 und ScramSha512.
username (Optional) Der Benutzername für die SASL-Authentifizierung. Wird nicht unterstützt, wenn AuthenticationMode den Wert Gssapi aufweist. Weitere Informationen finden Sie unter Verbindungen.
password (Optional) Das Kennwort für die SASL-Authentifizierung. Wird nicht unterstützt, wenn AuthenticationMode den Wert Gssapi aufweist. Weitere Informationen finden Sie unter Verbindungen.
protocol (Optional) Das Sicherheitsprotokoll, das beim Kommunizieren mit Brokern verwendet wird. Unterstützt werden die Werte plaintext (Standardwert), ssl, sasl_plaintext und sasl_ssl.
sslCaLocation (Optional) Der Pfad zur Zertifikatdatei der Zertifizierungsstelle zum Überprüfen des Zertifikats des Brokers.
sslCertificateLocation (Optional) Der Pfad zum Zertifikat des Clients.
sslKeyLocation (Optional) Der Pfad zum privaten Schlüssel (PEM) des Clients, der für die Authentifizierung verwendet wird.
sslKeyPassword (Optional) Das Kennwort für das Zertifikat des Clients.

Verwendung

Sowohl Schlüssel als auch Werttypen werden durch die integrierte Avro- und Protobuf-Serialisierung unterstützt.

Der Offset, die Partition und der Zeitstempel für das Ereignis werden zur Laufzeit generiert. Nur Werte und Header können innerhalb der Funktion festgelegt werden. Das Thema wird in der Datei „function.json“ festgelegt.

Stellen Sie sicher, dass Sie Zugriff auf das Kafka-Thema besitzen, in das Sie schreiben möchten. Sie konfigurieren die Bindung mit Zugriffs- und Verbindungsanmeldeinformationen für das Kafka-Thema.

In einem Premium-Plan müssen Sie die Laufzeitskalierungsüberwachung für die Kafka-Ausgabe aktivieren, um auf mehrere Instanzen aufzuskalieren. Weitere Informationen finden Sie unter Aktivieren der Laufzeitskalierung.

Eine vollständige Liste der unterstützten host.json-Einstellungen für den Kafka-Trigger finden Sie unter host.json-Einstellungen.

Verbindungen

Alle von Ihren Triggern und Bindungen benötigten Verbindungsinformationen sollten in den Anwendungseinstellungen und nicht in den Bindungsdefinitionen in Ihrem Code verwaltet werden. Dies gilt auch für Anmeldeinformationen, die niemals in Ihrem Code gespeichert werden sollten.

Wichtig

Einstellungen für Anmeldeinformationen müssen auf eine Anwendungseinstellung verweisen. Stellen Sie keine hartcodierten Anmeldeinformationen in Ihrem Code oder Ihren Konfigurationsdateien bereit. Wenn die Ausführung lokal erfolgt, verwenden Sie die Datei local.settings.json für Ihre Anmeldeinformationen, und veröffentlichen Sie die Datei „local.settings.json“ nicht.

Wenn Sie eine Verbindung mit einem von Confluent in Azure bereitgestellten verwalteten Kafka-Cluster herstellen, stellen Sie sicher, dass die folgenden Authentifizierungsanmeldeinformationen für Ihre Confluent Cloud-Umgebung in Ihrem Trigger oder Ihrer Bindung festgelegt sind:

Einstellung Empfohlener Wert BESCHREIBUNG
BrokerList BootstrapServer Die App-Einstellung namens BootstrapServer enthält den Wert des Bootstrapservers, der auf der Seite mit den Confluent Cloud-Einstellungen gefunden wurde. Der Wert ähnelt xyz-xyzxzy.westeurope.azure.confluent.cloud:9092.
Benutzername ConfluentCloudUsername Die App-Einstellung namens ConfluentCloudUsername enthält den API-Zugriffsschlüssel von der Confluent Cloud-Website.
Kennwort ConfluentCloudPassword Die App-Einstellung namens ConfluentCloudPassword enthält das API-Geheimnis, das von der Confluent Cloud-Website abgerufen wurde.

Die für diese Einstellungen verwendeten Zeichenfolgenwerte müssen während der lokalen Entwicklung als Anwendungseinstellungen in Azure oder in der Values-Sammlung in der Datei local.settings.json vorhanden sein.

Sie sollten auch Protocol, AuthenticationMode und SslCaLocation in Ihren Bindungsdefinitionen festlegen.

Nächste Schritte