Udostępnij za pośrednictwem


Powiązanie wyjściowe platformy Apache Kafka dla usługi Azure Functions

Powiązanie wyjściowe umożliwia aplikacji usługi Azure Functions zapisywanie komunikatów w temacie platformy Kafka.

Ważne

Powiązania platformy Kafka są dostępne tylko dla funkcji w ramach planu Elastic Premium i dedykowanego (App Service). Są one obsługiwane tylko w wersji 3.x i nowszej środowiska uruchomieniowego usługi Functions.

Przykład

Użycie powiązania zależy od modalności języka C# używanej w aplikacji funkcji, co może być jednym z następujących elementów:

Izolowana biblioteka klas procesów roboczych skompilowana funkcja języka C# jest uruchamiana w procesie odizolowanym od środowiska uruchomieniowego.

Używane atrybuty zależą od określonego dostawcy zdarzeń.

Poniższy przykład ma niestandardowy typ zwracany, czyli MultipleOutputType, który składa się z odpowiedzi HTTP i danych wyjściowych platformy Kafka.

[Function("KafkaOutput")]

public static MultipleOutputType Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");

    string message = req.FunctionContext
                        .BindingContext
                        .BindingData["message"]
                        .ToString();

    var response = req.CreateResponse(HttpStatusCode.OK);
    return new MultipleOutputType()
    {
        Kevent = message,
        HttpResponse = response
    };
}

W klasie MultipleOutputTypeKevent jest zmienną powiązania wyjściowego dla powiązania platformy Kafka.

public class MultipleOutputType
{
    [KafkaOutput("BrokerList",
                "topic",
                Username = "ConfluentCloudUserName",
                Password = "ConfluentCloudPassword",
        Protocol = BrokerProtocol.SaslSsl,
        AuthenticationMode = BrokerAuthenticationMode.Plain
    )]        
    public string Kevent { get; set; }

    public HttpResponseData HttpResponse { get; set; }
}

Aby wysłać partię zdarzeń, przekaż tablicę ciągów do typu danych wyjściowych, jak pokazano w poniższym przykładzie:

[Function("KafkaOutputMany")]

public static MultipleOutputTypeForBatch Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");
    var response = req.CreateResponse(HttpStatusCode.OK);

    string[] messages = new string[2];
    messages[0] = "one";
    messages[1] = "two";

    return new MultipleOutputTypeForBatch()
    {
        Kevents = messages,
        HttpResponse = response
    };
}

Tablica ciągów jest definiowana jako Kevents właściwość klasy, na której zdefiniowano powiązanie wyjściowe:

public class MultipleOutputTypeForBatch
{
    [KafkaOutput("BrokerList",
                 "topic",
                 Username = "ConfluentCloudUserName",
                 Password = "ConfluentCloudPassword",
        Protocol = BrokerProtocol.SaslSsl,
        AuthenticationMode = BrokerAuthenticationMode.Plain
    )]        
    public string[] Kevents { get; set; }

    public HttpResponseData HttpResponse { get; set; }
}

Następująca funkcja dodaje nagłówki do danych wyjściowych platformy Kafka:

[Function("KafkaOutputWithHeaders")]

public static MultipleOutputType Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");

    string message = req.FunctionContext
                        .BindingContext
                        .BindingData["message"]
                        .ToString();
    string kevent = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"dotnet-isolated\" }] }";
    var response = req.CreateResponse(HttpStatusCode.OK);
    return new MultipleOutputType()
    {
        Kevent = kevent,
        HttpResponse = response
    };
}

Pełny zestaw działających przykładów platformy .NET można znaleźć w repozytorium rozszerzeń platformy Kafka.

Uwaga

Aby zapoznać się z równoważnym zestawem przykładów języka TypeScript, zobacz repozytorium rozszerzeń platformy Kafka

Określone właściwości pliku function.json zależą od dostawcy zdarzeń, który w tych przykładach to Confluent lub Azure Event Hubs. W poniższych przykładach pokazano powiązanie danych wyjściowych platformy Kafka dla funkcji wyzwalanej przez żądanie HTTP i wysyłanie danych z żądania do tematu platformy Kafka.

W poniższych function.json zdefiniowano wyzwalacz dla określonego dostawcy w następujących przykładach:

{
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "name": "outputKafkaMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username": "ConfluentCloudUsername",
      "password": "ConfluentCloudPassword",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN",
      "direction": "out"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "res"
    }
  ]
}

Poniższy kod wysyła następnie komunikat do tematu:

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');

    const message = (req.query.message);
    context.bindings.outputKafkaMessage = message;
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: 'Ok'
    };
}

Poniższy kod wysyła wiele komunikatów jako tablicę do tego samego tematu:

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');
    
    context.bindings.outputKafkaMessages = ["one", "two"];
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: responseMessage
    };
}

W poniższym przykładzie pokazano, jak wysłać komunikat zdarzenia z nagłówkami do tego samego tematu platformy Kafka:

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');

    const message = (req.query.message || (req.body && req.body.message));
    const responseMessage = message
        ? "Message received: " + message + ". The message transfered to the kafka broker."
        : "This HTTP triggered function executed successfully. Pass a message in the query string or in the request body for a personalized response.";
    context.bindings.outputKafkaMessage = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"javascript\" }] }"
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: responseMessage
    };
}

Pełny zestaw działających przykładów języka JavaScript można znaleźć w repozytorium rozszerzeń platformy Kafka.

Określone właściwości pliku function.json zależą od dostawcy zdarzeń, który w tych przykładach to Confluent lub Azure Event Hubs. W poniższych przykładach pokazano powiązanie danych wyjściowych platformy Kafka dla funkcji wyzwalanej przez żądanie HTTP i wysyłanie danych z żądania do tematu platformy Kafka.

W poniższych function.json zdefiniowano wyzwalacz dla określonego dostawcy w następujących przykładach:

{
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "Request",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "name": "outputMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username" : "%ConfluentCloudUserName%",
      "password" : "%ConfluentCloudPassword%",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN",
      "direction": "out"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "Response"
    }
  ]
}

Poniższy kod wysyła następnie komunikat do tematu:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

# Interact with query parameters or the body of the request.
$message = $Request.Query.Message

$message

Push-OutputBinding -Name outputMessage -Value ($message)

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
})

Poniższy kod wysyła wiele komunikatów jako tablicę do tego samego tematu:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

$message = @("one", "two")
Push-OutputBinding -Name outputMessage -Value ($message)

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
})

W poniższym przykładzie pokazano, jak wysłać komunikat zdarzenia z nagłówkami do tego samego tematu platformy Kafka:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
if (-not $message) {
    $message = $Request.Body.Message
}

$kevent = @{
    Offset = 364
    Partition = 0
    Topic = "kafkaeventhubtest1"
    Timestamp = "2022-04-09T03:20:06.591Z"
    Value = $message
    Headers= @(@{
        Key= "test"
        Value= "powershell"
    }
    )
}

Push-OutputBinding -Name Message -Value $kevent

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
    Body = 'ok'
})

Pełny zestaw działających przykładów programu PowerShell można znaleźć w repozytorium rozszerzeń platformy Kafka.

Określone właściwości pliku function.json zależą od dostawcy zdarzeń, który w tych przykładach to Confluent lub Azure Event Hubs. W poniższych przykładach pokazano powiązanie danych wyjściowych platformy Kafka dla funkcji wyzwalanej przez żądanie HTTP i wysyłanie danych z żądania do tematu platformy Kafka.

W poniższych function.json zdefiniowano wyzwalacz dla określonego dostawcy w następujących przykładach:

{
  "scriptFile": "main.py",
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "direction": "out",
      "name": "outputMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username": "%ConfluentCloudUserName%",
      "password": "%ConfluentCloudPassword%",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "$return"
    }
  ]
}

Poniższy kod wysyła następnie komunikat do tematu:

import logging

import azure.functions as func


def main(req: func.HttpRequest, outputMessage: func.Out[str]) -> func.HttpResponse:
    input_msg = req.params.get('message')
    outputMessage.set(input_msg)
    return 'OK'

Poniższy kod wysyła wiele komunikatów jako tablicę do tego samego tematu:

import logging
import typing
from azure.functions import Out, HttpRequest, HttpResponse
import json

def main(req: HttpRequest, outputMessage: Out[str] ) -> HttpResponse:
    outputMessage.set(['one', 'two'])
    return 'OK'

W poniższym przykładzie pokazano, jak wysłać komunikat zdarzenia z nagłówkami do tego samego tematu platformy Kafka:

import logging

import azure.functions as func
import json

def main(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
    message = req.params.get('message')
    kevent =  { "Offset":364,"Partition":0,"Topic":"kafkaeventhubtest1","Timestamp":"2022-04-09T03:20:06.591Z", "Value": message, "Headers": [{ "Key": "test", "Value": "python" }] }
    out.set(json.dumps(kevent))
    return 'OK'

Pełny zestaw działających przykładów języka Python można znaleźć w repozytorium rozszerzeń platformy Kafka.

Adnotacje używane do konfigurowania powiązania wyjściowego zależą od określonego dostawcy zdarzeń.

Poniższa funkcja wysyła komunikat do tematu platformy Kafka.

@FunctionName("KafkaOutput")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<String> output,
        final ExecutionContext context) {
    context.getLogger().info("Java HTTP trigger processed a request.");

    // Parse query parameter
    String query = request.getQueryParameters().get("message");
    String message = request.getBody().orElse(query);
    context.getLogger().info("Message:" + message);
    output.setValue(message);
    return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();

W poniższym przykładzie pokazano, jak wysyłać wiele komunikatów do tematu platformy Kafka.

@FunctionName("KafkaOutputMany")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<String[]> output,
        final ExecutionContext context) {
    context.getLogger().info("Java HTTP trigger processed a request.");
    String[] messages = new String[2];
    messages[0] = "one";
    messages[1] = "two";
    output.setValue(messages);
    return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}

W tym przykładzie parametr powiązania wyjściowego został zmieniony na tablicę ciągów.

W ostatnim przykładzie użyto następujących KafkaEntity klas i KafkaHeader :

public class KafkaEntity {
    public int Offset;
    public int Partition;
    public String Timestamp;
    public String Topic;
    public String Value;
    public KafkaHeaders Headers[];

    public KafkaEntity(int Offset, int Partition, String Topic, String Timestamp, String Value,KafkaHeaders[] headers) {
        this.Offset = Offset;
        this.Partition = Partition;
        this.Topic = Topic;
        this.Timestamp = Timestamp;
        this.Value = Value;
        this.Headers = headers;
    }
public class KafkaHeaders{
    public String Key;
    public String Value;

    public KafkaHeaders(String key, String value) {
        this.Key = key;
        this.Value = value;
    }

Poniższa przykładowa funkcja wysyła komunikat z nagłówkami do tematu platformy Kafka.

@FunctionName("KafkaOutputWithHeaders")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<KafkaEntity> output,
        final ExecutionContext context) {
            context.getLogger().info("Java HTTP trigger processed a request.");
    
            // Parse query parameter
            String query = request.getQueryParameters().get("message");
            String message = request.getBody().orElse(query);
            KafkaHeaders[] headers = new KafkaHeaders[1];
            headers[0] = new KafkaHeaders("test", "java");
            KafkaEntity kevent = new KafkaEntity(364, 0, "topic", "2022-04-09T03:20:06.591Z", message, headers);
            output.setValue(kevent);
            return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
        }

Pełny zestaw działających przykładów języka Java dla platformy Confluent można znaleźć w repozytorium rozszerzeń platformy Kafka.

Atrybuty

Biblioteki języka C# procesu roboczego w procesie przetwarzania procesów przetwarzania procesów procesów przetwarzania w procesie przetwarzania izolowanego używają atrybutu Kafka do zdefiniowania wyzwalacza funkcji.

W poniższej tabeli opisano właściwości, które można ustawić przy użyciu tego atrybutu:

Parametr Opis
Lista brokerów (Wymagane) Lista brokerów platformy Kafka, do których są wysyłane dane wyjściowe. Aby uzyskać więcej informacji, zobacz Połączenia .
Temat (Wymagane) Temat, do którego są wysyłane dane wyjściowe.
AvroSchema (Opcjonalnie) Schemat rekordu ogólnego w przypadku korzystania z protokołu Avro.
MaxMessageBytes (Opcjonalnie) Maksymalny rozmiar wysyłanego komunikatu wyjściowego (w MB) z wartością 1domyślną .
BatchSize (Opcjonalnie) Maksymalna liczba komunikatów wsadowych w jednym zestawie komunikatów z wartością 10000domyślną .
EnableIdempotence (Opcjonalnie) W przypadku ustawienia opcji truegwarantuje, że komunikaty są generowane dokładnie raz i w oryginalnej kolejności produkcji z wartością domyślną false
MessageTimeoutMs (Opcjonalnie) Limit czasu komunikatu lokalnego( w milisekundach). Ta wartość jest wymuszana tylko lokalnie i ogranicza czas oczekiwania wygenerowanego komunikatu na pomyślne dostarczenie z domyślną wartością 300000. Czas 0 jest nieskończony. Ta wartość to maksymalny czas używany do dostarczenia komunikatu (w tym ponownych prób). Błąd dostarczania występuje, gdy liczba ponownych prób lub przekroczenie limitu czasu komunikatu.
RequestTimeoutMs (Opcjonalnie) Limit czasu potwierdzenia żądania wyjściowego (w milisekundach) z wartością domyślną 5000.
Maksymalna liczba ponownych prób (Opcjonalnie) Liczba ponownych prób wysłania komunikatu zakończonego niepowodzeniem z wartością domyślną 2. Ponawianie próby może spowodować zmiana kolejności, chyba że EnableIdempotence ustawiono wartość true.
AuthenticationMode (Opcjonalnie) Tryb uwierzytelniania podczas korzystania z uwierzytelniania prostego i warstwy zabezpieczeń (SASL). Obsługiwane wartości to Gssapi, Plain (wartość domyślna), ScramSha256, ScramSha512.
Nazwa użytkownika (Opcjonalnie) Nazwa użytkownika uwierzytelniania SASL. Nieobsługiwane, gdy AuthenticationMode ma wartość Gssapi. Aby uzyskać więcej informacji, zobacz Połączenia .
Hasło (Opcjonalnie) Hasło do uwierzytelniania SASL. Nieobsługiwane, gdy AuthenticationMode ma wartość Gssapi. Aby uzyskać więcej informacji, zobacz Połączenia .
Protokół (Opcjonalnie) Protokół zabezpieczeń używany podczas komunikacji z brokerami. Obsługiwane wartości to plaintext (wartość domyślna), ssl, sasl_plaintext, sasl_ssl.
SslCaLocation (Opcjonalnie) Ścieżka do pliku certyfikatu urzędu certyfikacji na potrzeby weryfikowania certyfikatu brokera.
SslCertificateLocation (Opcjonalnie) Ścieżka do certyfikatu klienta.
SslKeyLocation (Opcjonalnie) Ścieżka do klucza prywatnego klienta (PEM) używanego do uwierzytelniania.
SslKeyPassword (Opcjonalnie) Hasło do certyfikatu klienta.

Adnotacje

Adnotacja KafkaOutput umożliwia utworzenie funkcji, która zapisuje w określonym temacie. Obsługiwane opcje obejmują następujące elementy:

Element opis
name Nazwa zmiennej reprezentującej dane obsługiwane przez brokera w kodzie funkcji.
brokerList (Wymagane) Lista brokerów platformy Kafka, do których są wysyłane dane wyjściowe. Aby uzyskać więcej informacji, zobacz Połączenia .
topic (Wymagane) Temat, do którego są wysyłane dane wyjściowe.
Datatype Definiuje sposób obsługi wartości parametru przez funkcję Functions. Domyślnie wartość jest uzyskiwana jako ciąg, a usługa Functions próbuje wykonać deserializacji ciągu do rzeczywistego zwykłego obiektu Java (POJO). Gdy stringelement wejściowy jest traktowany jako tylko ciąg. Gdy binarykomunikat zostanie odebrany jako dane binarne, a usługa Functions próbuje wykonać deserializacji go do rzeczywistego bajtu typu parametru[].
avroSchema (Opcjonalnie) Schemat rekordu ogólnego w przypadku korzystania z protokołu Avro. (Obecnie nieobsługiwane dla języka Java).
maxMessageBytes (Opcjonalnie) Maksymalny rozmiar wysyłanego komunikatu wyjściowego (w MB) z wartością 1domyślną .
batchSize (Opcjonalnie) Maksymalna liczba komunikatów wsadowych w jednym zestawie komunikatów z wartością 10000domyślną .
enableIdempotence (Opcjonalnie) W przypadku ustawienia opcji truegwarantuje, że komunikaty są generowane dokładnie raz i w oryginalnej kolejności produkcji z wartością domyślną false
messageTimeoutMs (Opcjonalnie) Limit czasu komunikatu lokalnego( w milisekundach). Ta wartość jest wymuszana tylko lokalnie i ogranicza czas oczekiwania wygenerowanego komunikatu na pomyślne dostarczenie z domyślną wartością 300000. Czas 0 jest nieskończony. Jest to maksymalny czas używany do dostarczania komunikatu (w tym ponownych prób). Błąd dostarczania występuje, gdy liczba ponownych prób lub przekroczenie limitu czasu komunikatu.
requestTimeoutMs (Opcjonalnie) Limit czasu potwierdzenia żądania wyjściowego (w milisekundach) z wartością domyślną 5000.
maxRetries (Opcjonalnie) Liczba ponownych prób wysłania komunikatu zakończonego niepowodzeniem z wartością domyślną 2. Ponawianie próby może spowodować zmiana kolejności, chyba że EnableIdempotence ustawiono wartość true.
authenticationMode (Opcjonalnie) Tryb uwierzytelniania podczas korzystania z uwierzytelniania prostego i warstwy zabezpieczeń (SASL). Obsługiwane wartości to Gssapi, Plain (wartość domyślna), ScramSha256, ScramSha512.
nazwa użytkownika (Opcjonalnie) Nazwa użytkownika uwierzytelniania SASL. Nieobsługiwane, gdy AuthenticationMode ma wartość Gssapi. Aby uzyskać więcej informacji, zobacz Połączenia .
hasło (Opcjonalnie) Hasło do uwierzytelniania SASL. Nieobsługiwane, gdy AuthenticationMode ma wartość Gssapi. Aby uzyskać więcej informacji, zobacz Połączenia .
protokół (Opcjonalnie) Protokół zabezpieczeń używany podczas komunikacji z brokerami. Obsługiwane wartości to plaintext (wartość domyślna), ssl, sasl_plaintext, sasl_ssl.
sslCaLocation (Opcjonalnie) Ścieżka do pliku certyfikatu urzędu certyfikacji na potrzeby weryfikowania certyfikatu brokera.
sslCertificateLocation (Opcjonalnie) Ścieżka do certyfikatu klienta.
sslKeyLocation (Opcjonalnie) Ścieżka do klucza prywatnego klienta (PEM) używanego do uwierzytelniania.
sslKeyPassword (Opcjonalnie) Hasło do certyfikatu klienta.

Konfigurowanie

W poniższej tabeli opisano właściwości konfiguracji powiązania ustawione w pliku function.json .

właściwość function.json opis
type Musi być ustawiona wartość kafka.
direction Musi być ustawiona wartość out.
name Nazwa zmiennej reprezentującej dane obsługiwane przez brokera w kodzie funkcji.
brokerList (Wymagane) Lista brokerów platformy Kafka, do których są wysyłane dane wyjściowe. Aby uzyskać więcej informacji, zobacz Połączenia .
topic (Wymagane) Temat, do którego są wysyłane dane wyjściowe.
avroSchema (Opcjonalnie) Schemat rekordu ogólnego w przypadku korzystania z protokołu Avro.
maxMessageBytes (Opcjonalnie) Maksymalny rozmiar wysyłanego komunikatu wyjściowego (w MB) z wartością 1domyślną .
batchSize (Opcjonalnie) Maksymalna liczba komunikatów wsadowych w jednym zestawie komunikatów z wartością 10000domyślną .
enableIdempotence (Opcjonalnie) W przypadku ustawienia opcji truegwarantuje, że komunikaty są generowane dokładnie raz i w oryginalnej kolejności produkcji z wartością domyślną false
messageTimeoutMs (Opcjonalnie) Limit czasu komunikatu lokalnego( w milisekundach). Ta wartość jest wymuszana tylko lokalnie i ogranicza czas oczekiwania wygenerowanego komunikatu na pomyślne dostarczenie z domyślną wartością 300000. Czas 0 jest nieskończony. Jest to maksymalny czas używany do dostarczania komunikatu (w tym ponownych prób). Błąd dostarczania występuje, gdy liczba ponownych prób lub przekroczenie limitu czasu komunikatu.
requestTimeoutMs (Opcjonalnie) Limit czasu potwierdzenia żądania wyjściowego (w milisekundach) z wartością domyślną 5000.
maxRetries (Opcjonalnie) Liczba ponownych prób wysłania komunikatu zakończonego niepowodzeniem z wartością domyślną 2. Ponawianie próby może spowodować zmiana kolejności, chyba że EnableIdempotence ustawiono wartość true.
authenticationMode (Opcjonalnie) Tryb uwierzytelniania podczas korzystania z uwierzytelniania prostego i warstwy zabezpieczeń (SASL). Obsługiwane wartości to Gssapi, Plain (wartość domyślna), ScramSha256, ScramSha512.
nazwa użytkownika (Opcjonalnie) Nazwa użytkownika uwierzytelniania SASL. Nieobsługiwane, gdy AuthenticationMode ma wartość Gssapi. Aby uzyskać więcej informacji, zobacz Połączenia .
hasło (Opcjonalnie) Hasło do uwierzytelniania SASL. Nieobsługiwane, gdy AuthenticationMode ma wartość Gssapi. Aby uzyskać więcej informacji, zobacz Połączenia .
protokół (Opcjonalnie) Protokół zabezpieczeń używany podczas komunikacji z brokerami. Obsługiwane wartości to plaintext (wartość domyślna), ssl, sasl_plaintext, sasl_ssl.
sslCaLocation (Opcjonalnie) Ścieżka do pliku certyfikatu urzędu certyfikacji na potrzeby weryfikowania certyfikatu brokera.
sslCertificateLocation (Opcjonalnie) Ścieżka do certyfikatu klienta.
sslKeyLocation (Opcjonalnie) Ścieżka do klucza prywatnego klienta (PEM) używanego do uwierzytelniania.
sslKeyPassword (Opcjonalnie) Hasło do certyfikatu klienta.

Użycie

Zarówno klucze, jak i typy wartości są obsługiwane z wbudowaną serializacji Avro i Protobuf .

Przesunięcie, partycja i sygnatura czasowa zdarzenia są generowane w czasie wykonywania. W funkcji można ustawić tylko wartości i nagłówki. Temat jest ustawiony w function.json.

Upewnij się, że masz dostęp do tematu platformy Kafka, do którego próbujesz napisać. Powiązanie należy skonfigurować przy użyciu poświadczeń dostępu i połączenia do tematu platformy Kafka.

W planie Premium należy włączyć monitorowanie skalowania w czasie wykonywania dla danych wyjściowych platformy Kafka, aby móc skalować w poziomie do wielu wystąpień. Aby dowiedzieć się więcej, zobacz Włączanie skalowania środowiska uruchomieniowego.

Aby uzyskać pełny zestaw obsługiwanych ustawień host.json wyzwalacza platformy Kafka, zobacz host.json ustawienia.

Połączenia

Wszystkie informacje o połączeniu wymagane przez wyzwalacze i powiązania powinny być przechowywane w ustawieniach aplikacji, a nie w definicjach powiązań w kodzie. Dotyczy to poświadczeń, które nigdy nie powinny być przechowywane w kodzie.

Ważne

Ustawienia poświadczeń muszą odwoływać się do ustawienia aplikacji. Nie należy zapisywać poświadczeń w kodzie ani plikach konfiguracji. W przypadku uruchamiania lokalnego użyj pliku local.settings.json dla poświadczeń i nie publikuj pliku local.settings.json.

Podczas nawiązywania połączenia z zarządzanym klastrem Platformy Kafka udostępnianym przez platformę Confluent na platformie Azure upewnij się, że następujące poświadczenia uwierzytelniania dla środowiska platformy Confluent Cloud zostały ustawione w wyzwalaczu lub powiązaniu:

Ustawienie Zalecana wartość opis
Lista brokerów BootstrapServer Ustawienie aplikacji o nazwie BootstrapServer zawiera wartość serwera bootstrap znalezionego na stronie ustawień chmury Confluent. Wartość przypomina xyz-xyzxzy.westeurope.azure.confluent.cloud:9092.
Nazwa użytkownika ConfluentCloudUsername Ustawienie aplikacji o nazwie ConfluentCloudUsername zawiera klucz dostępu interfejsu API z witryny internetowej Confluent Cloud.
Hasło ConfluentCloudPassword Ustawienie aplikacji o nazwie ConfluentCloudPassword zawiera wpis tajny interfejsu API uzyskany z witryny internetowej platformy Confluent Cloud.

Wartości ciągu używane dla tych ustawień muszą być obecne jako ustawienia aplikacji na platformie Azure lub w Values kolekcji w pliku local.settings.json podczas programowania lokalnego.

Należy również ustawić Protocoldefinicje powiązań , AuthenticationModei SslCaLocation .

Następne kroki