Поделиться через


Привязка для вывода Apache Kafka для службы "Функции Azure"

Выходная привязка позволяет приложению Функции Azure записывать сообщения в раздел Kafka.

Внимание

Привязки Kafka доступны только для Функций в составе эластичного плана "Премиум" и плана "Выделенный (Служба приложений)". Они поддерживаются только в среде выполнения Функций версии 3.x и выше.

Пример

Использование привязки зависит от модальности C#, используемой в приложении-функции. Это может быть один из следующих вариантов:

Изолированная библиотека классов рабочих процессов, скомпилированная функция C# выполняется в процессе, изолированном от среды выполнения.

Используемые атрибуты зависят от конкретного поставщика событий.

В следующем примере имеется пользовательский тип возвращаемого значения MultipleOutputType, который состоит из HTTP-ответа и выходных данных Kafka.

[Function("KafkaOutput")]

public static MultipleOutputType Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");

    string message = req.FunctionContext
                        .BindingContext
                        .BindingData["message"]
                        .ToString();

    var response = req.CreateResponse(HttpStatusCode.OK);
    return new MultipleOutputType()
    {
        Kevent = message,
        HttpResponse = response
    };
}

В классе MultipleOutputType Kevent — это переменная выходной привязки для привязки Kafka.

public class MultipleOutputType
{
    [KafkaOutput("BrokerList",
                "topic",
                Username = "ConfluentCloudUserName",
                Password = "ConfluentCloudPassword",
        Protocol = BrokerProtocol.SaslSsl,
        AuthenticationMode = BrokerAuthenticationMode.Plain
    )]        
    public string Kevent { get; set; }

    public HttpResponseData HttpResponse { get; set; }
}

Чтобы отправить пакет событий, передайте массив строк в тип выходных данных, как показано в следующем примере:

[Function("KafkaOutputMany")]

public static MultipleOutputTypeForBatch Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");
    var response = req.CreateResponse(HttpStatusCode.OK);

    string[] messages = new string[2];
    messages[0] = "one";
    messages[1] = "two";

    return new MultipleOutputTypeForBatch()
    {
        Kevents = messages,
        HttpResponse = response
    };
}

Массив строк определяется как Kevents свойство класса, для которого определена выходная привязка:

public class MultipleOutputTypeForBatch
{
    [KafkaOutput("BrokerList",
                 "topic",
                 Username = "ConfluentCloudUserName",
                 Password = "ConfluentCloudPassword",
        Protocol = BrokerProtocol.SaslSsl,
        AuthenticationMode = BrokerAuthenticationMode.Plain
    )]        
    public string[] Kevents { get; set; }

    public HttpResponseData HttpResponse { get; set; }
}

Следующая функция добавляет заголовки в выходные данные Kafka:

[Function("KafkaOutputWithHeaders")]

public static MultipleOutputType Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");

    string message = req.FunctionContext
                        .BindingContext
                        .BindingData["message"]
                        .ToString();
    string kevent = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"dotnet-isolated\" }] }";
    var response = req.CreateResponse(HttpStatusCode.OK);
    return new MultipleOutputType()
    {
        Kevent = kevent,
        HttpResponse = response
    };
}

Полный набор рабочих примеров .NET см. в репозитории расширений Kafka.

Примечание.

Эквивалентный набор примеров TypeScript см. в репозитории расширений Kafka

Конкретные свойства файла function.json зависят от поставщика событий. В данных примерах это Confluent или Центры событий Azure. В следующих примерах показана выходная привязка Kafka для функции, которая активируется HTTP-запросом и отправляет данные из запроса в раздел Kafka.

Следующий файл function.json определяет триггер для конкретного поставщика в этих примерах:

{
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "name": "outputKafkaMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username": "ConfluentCloudUsername",
      "password": "ConfluentCloudPassword",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN",
      "direction": "out"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "res"
    }
  ]
}

Затем следующий код отправляет сообщение в раздел:

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');

    const message = (req.query.message);
    context.bindings.outputKafkaMessage = message;
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: 'Ok'
    };
}

Следующий код отправляет несколько сообщений в виде массива в один и тот же раздел:

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');
    
    context.bindings.outputKafkaMessages = ["one", "two"];
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: responseMessage
    };
}

В следующем примере показано, как отправить сообщение о событии с заголовками в тот же раздел Kafka:

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');

    const message = (req.query.message || (req.body && req.body.message));
    const responseMessage = message
        ? "Message received: " + message + ". The message transfered to the kafka broker."
        : "This HTTP triggered function executed successfully. Pass a message in the query string or in the request body for a personalized response.";
    context.bindings.outputKafkaMessage = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"javascript\" }] }"
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: responseMessage
    };
}

Полный набор рабочих примеров JavaScript см. в репозитории расширений Kafka.

Конкретные свойства файла function.json зависят от поставщика событий. В данных примерах это Confluent или Центры событий Azure. В следующих примерах показана выходная привязка Kafka для функции, которая активируется HTTP-запросом и отправляет данные из запроса в раздел Kafka.

Следующий файл function.json определяет триггер для конкретного поставщика в этих примерах:

{
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "Request",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "name": "outputMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username" : "%ConfluentCloudUserName%",
      "password" : "%ConfluentCloudPassword%",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN",
      "direction": "out"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "Response"
    }
  ]
}

Затем следующий код отправляет сообщение в раздел:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

# Interact with query parameters or the body of the request.
$message = $Request.Query.Message

$message

Push-OutputBinding -Name outputMessage -Value ($message)

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
})

Следующий код отправляет несколько сообщений в виде массива в один и тот же раздел:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

$message = @("one", "two")
Push-OutputBinding -Name outputMessage -Value ($message)

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
})

В следующем примере показано, как отправить сообщение о событии с заголовками в тот же раздел Kafka:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
if (-not $message) {
    $message = $Request.Body.Message
}

$kevent = @{
    Offset = 364
    Partition = 0
    Topic = "kafkaeventhubtest1"
    Timestamp = "2022-04-09T03:20:06.591Z"
    Value = $message
    Headers= @(@{
        Key= "test"
        Value= "powershell"
    }
    )
}

Push-OutputBinding -Name Message -Value $kevent

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
    Body = 'ok'
})

Полный набор рабочих примеров PowerShell см. в репозитории расширений Kafka.

Конкретные свойства файла function.json зависят от поставщика событий. В данных примерах это Confluent или Центры событий Azure. В следующих примерах показана выходная привязка Kafka для функции, которая активируется HTTP-запросом и отправляет данные из запроса в раздел Kafka.

Следующий файл function.json определяет триггер для конкретного поставщика в этих примерах:

{
  "scriptFile": "main.py",
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "direction": "out",
      "name": "outputMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username": "%ConfluentCloudUserName%",
      "password": "%ConfluentCloudPassword%",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "$return"
    }
  ]
}

Затем следующий код отправляет сообщение в раздел:

import logging

import azure.functions as func


def main(req: func.HttpRequest, outputMessage: func.Out[str]) -> func.HttpResponse:
    input_msg = req.params.get('message')
    outputMessage.set(input_msg)
    return 'OK'

Следующий код отправляет несколько сообщений в виде массива в один и тот же раздел:

import logging
import typing
from azure.functions import Out, HttpRequest, HttpResponse
import json

def main(req: HttpRequest, outputMessage: Out[str] ) -> HttpResponse:
    outputMessage.set(['one', 'two'])
    return 'OK'

В следующем примере показано, как отправить сообщение о событии с заголовками в тот же раздел Kafka:

import logging

import azure.functions as func
import json

def main(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
    message = req.params.get('message')
    kevent =  { "Offset":364,"Partition":0,"Topic":"kafkaeventhubtest1","Timestamp":"2022-04-09T03:20:06.591Z", "Value": message, "Headers": [{ "Key": "test", "Value": "python" }] }
    out.set(json.dumps(kevent))
    return 'OK'

Полный набор рабочих примеров Python см. в репозитории расширений Kafka.

Заметки, используемые для настройки выходной привязки, зависят от конкретного поставщика событий.

Следующая функция отправляет сообщение в раздел Kafka.

@FunctionName("KafkaOutput")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<String> output,
        final ExecutionContext context) {
    context.getLogger().info("Java HTTP trigger processed a request.");

    // Parse query parameter
    String query = request.getQueryParameters().get("message");
    String message = request.getBody().orElse(query);
    context.getLogger().info("Message:" + message);
    output.setValue(message);
    return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();

В следующем примере показано, как отправить несколько сообщений в раздел Kafka.

@FunctionName("KafkaOutputMany")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<String[]> output,
        final ExecutionContext context) {
    context.getLogger().info("Java HTTP trigger processed a request.");
    String[] messages = new String[2];
    messages[0] = "one";
    messages[1] = "two";
    output.setValue(messages);
    return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}

В этом примере параметр выходной привязки изменяется на массив строк.

Последний пример используется для следующих классов KafkaEntity и KafkaHeader:

public class KafkaEntity {
    public int Offset;
    public int Partition;
    public String Timestamp;
    public String Topic;
    public String Value;
    public KafkaHeaders Headers[];

    public KafkaEntity(int Offset, int Partition, String Topic, String Timestamp, String Value,KafkaHeaders[] headers) {
        this.Offset = Offset;
        this.Partition = Partition;
        this.Topic = Topic;
        this.Timestamp = Timestamp;
        this.Value = Value;
        this.Headers = headers;
    }
public class KafkaHeaders{
    public String Key;
    public String Value;

    public KafkaHeaders(String key, String value) {
        this.Key = key;
        this.Value = value;
    }

В следующем примере функция отправляет сообщение с заголовками в раздел Kafka.

@FunctionName("KafkaOutputWithHeaders")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<KafkaEntity> output,
        final ExecutionContext context) {
            context.getLogger().info("Java HTTP trigger processed a request.");
    
            // Parse query parameter
            String query = request.getQueryParameters().get("message");
            String message = request.getBody().orElse(query);
            KafkaHeaders[] headers = new KafkaHeaders[1];
            headers[0] = new KafkaHeaders("test", "java");
            KafkaEntity kevent = new KafkaEntity(364, 0, "topic", "2022-04-09T03:20:06.591Z", message, headers);
            output.setValue(kevent);
            return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
        }

Полный набор рабочих примеров Java для Confluent см. в репозитории расширений Kafka.

Атрибуты

Библиотеки C# в процессе и изолированном рабочем процессе используют Kafka атрибут для определения триггера функции.

В следующей таблице описаны свойства, которые можно задать с помощью этого атрибута:

Параметр Описание
BrokerList (Обязательно) Список брокеров Kafka, на которые отправляются выходные данные. Дополнительные сведения см. в разделе Подключения.
Раздел (Обязательно) Раздел, в который направляются выходные данные.
AvroSchema (Необязательно) Схема универсальной записи при использовании протокола Avro.
MaxMessageBytes (Необязательно) Максимальный размер отправляемого выходного сообщения (в МБ) со значением 1 по умолчанию.
BatchSize (Необязательно) Максимальное количество сообщений, пакетированных в одном наборе сообщений, со значением 10000 по умолчанию.
EnableIdempotence (Необязательно) Если задано значение true, гарантирует, что сообщения успешно создаются ровно один раз и в исходном порядке создания со значением по умолчанию false
MessageTimeoutMs (Необязательно) Время ожидания локального сообщения в миллисекундах. Это значение применяется только локально и ограничивает время ожидания сообщением успешной доставки с заданным по умолчанию 300000. Время 0 не ограничено. Это значение — максимальное время, используемое для доставки сообщения (включая повторные попытки). Ошибка доставки возникает при превышении количества повторных попыток или превышения времени ожидания сообщения.
RequestTimeoutMs (Необязательно) Время ожидания подтверждения для выходного запроса в миллисекундах со значением по умолчанию 5000.
MaxRetries (Необязательно) Число повторных попыток отправки неудачного сообщения со значением по умолчанию 2. Повторная попытка может привести к переупорядочению, если EnableIdempotence не задано как true.
AuthenticationMode (Необязательно) Режим, используемый при простой проверке подлинности и проверке подлинности уровня безопасности (SASL). Поддерживаемые значения: Gssapi, Plain (по умолчанию), ScramSha256, ScramSha512.
Username (Необязательно) Имя пользователя для проверки подлинности SASL. Не поддерживается, если AuthenticationMode имеет значение Gssapi. Дополнительные сведения см. в разделе Подключения.
Пароль (Необязательно) Пароль для проверки подлинности SASL. Не поддерживается, если AuthenticationMode имеет значение Gssapi. Дополнительные сведения см. в разделе Подключения.
Протокол (Необязательно) Протокол безопасности, используемый при взаимодействии с брокерами. Поддерживаемые значения: plaintext (по умолчанию), ssl, sasl_plaintext, sasl_ssl.
SslCaLocation (Необязательно) Путь к файлу сертификата ЦС для проверки сертификата брокера.
SslCertificateLocation (Необязательно) Путь к сертификату клиента.
SslKeyLocation (Необязательно) Путь к закрытому ключу клиента (PEM), используемому для проверки подлинности.
SslKeyPassword (Необязательно) Пароль для сертификата клиента.

Заметки

Заметка KafkaOutput позволяет создать функцию, которая записывает данные в определенный раздел. Поддерживаемые варианты включают следующие элементы:

Элемент Description
name Имя переменной, представляющей данные, полученные через брокер, в коде функции.
brokerList (Обязательно) Список брокеров Kafka, на которые отправляются выходные данные. Дополнительные сведения см. в разделе Подключения.
topic (Обязательно) Раздел, в который направляются выходные данные.
dataType Определяет, как Функции обрабатывают значение параметра. По умолчанию полученное значение представляет собой строку, и Функции пытаются десериализовать ее до объекта POJO. При string входные данные обрабатываются просто как строка. При binary сообщение приходит в формате двоичных данных и Функции пытаются десериализовать его до фактического типа параметра byte[].
avroSchema (Необязательно) Схема универсальной записи при использовании протокола Avro. (В настоящее время не поддерживается для Java.)
maxMessageBytes (Необязательно) Максимальный размер отправляемого выходного сообщения (в МБ) со значением 1 по умолчанию.
batchSize (Необязательно) Максимальное количество сообщений, пакетированных в одном наборе сообщений, со значением 10000 по умолчанию.
EnableIdempotence (Необязательно) Если задано значение true, гарантирует, что сообщения успешно создаются ровно один раз и в исходном порядке создания со значением по умолчанию false
MessageTimeoutMs (Необязательно) Время ожидания локального сообщения в миллисекундах. Это значение применяется только локально и ограничивает время ожидания сообщением успешной доставки с заданным по умолчанию 300000. Время 0 не ограничено. Это максимальное время, используемое для доставки сообщения (включая повторные попытки). Ошибка доставки возникает при превышении количества повторных попыток или превышения времени ожидания сообщения.
requestTimeoutMs (Необязательно) Время ожидания подтверждения для выходного запроса в миллисекундах со значением по умолчанию 5000.
maxRetries (Необязательно) Число повторных попыток отправки неудачного сообщения со значением по умолчанию 2. Повторная попытка может привести к переупорядочению, если EnableIdempotence не задано как true.
authenticationMode (Необязательно) Режим, используемый при простой проверке подлинности и проверке подлинности уровня безопасности (SASL). Поддерживаемые значения: Gssapi, Plain (по умолчанию), ScramSha256, ScramSha512.
username (Необязательно) Имя пользователя для проверки подлинности SASL. Не поддерживается, если AuthenticationMode имеет значение Gssapi. Дополнительные сведения см. в разделе Подключения.
пароль (Необязательно) Пароль для проверки подлинности SASL. Не поддерживается, если AuthenticationMode имеет значение Gssapi. Дополнительные сведения см. в разделе Подключения.
protocol (Необязательно) Протокол безопасности, используемый при взаимодействии с брокерами. Поддерживаемые значения: plaintext (по умолчанию), ssl, sasl_plaintext, sasl_ssl.
sslCaLocation (Необязательно) Путь к файлу сертификата ЦС для проверки сертификата брокера.
sslCertificateLocation (Необязательно) Путь к сертификату клиента.
sslKeyLocation (Необязательно) Путь к закрытому ключу клиента (PEM), используемому для проверки подлинности.
sslKeyPassword (Необязательно) Пароль для сертификата клиента.

Настройка

В следующей таблице описываются свойства конфигурации привязки, которые задаются в файле function.json.

Свойство в function.json Описание
type Должен иметь значениеkafka.
direction Должен иметь значениеout.
name Имя переменной, представляющей данные, полученные через брокер, в коде функции.
brokerList (Обязательно) Список брокеров Kafka, на которые отправляются выходные данные. Дополнительные сведения см. в разделе Подключения.
topic (Обязательно) Раздел, в который направляются выходные данные.
avroSchema (Необязательно) Схема универсальной записи при использовании протокола Avro.
maxMessageBytes (Необязательно) Максимальный размер отправляемого выходного сообщения (в МБ) со значением 1 по умолчанию.
batchSize (Необязательно) Максимальное количество сообщений, пакетированных в одном наборе сообщений, со значением 10000 по умолчанию.
EnableIdempotence (Необязательно) Если задано значение true, гарантирует, что сообщения успешно создаются ровно один раз и в исходном порядке создания со значением по умолчанию false
MessageTimeoutMs (Необязательно) Время ожидания локального сообщения в миллисекундах. Это значение применяется только локально и ограничивает время ожидания сообщением успешной доставки с заданным по умолчанию 300000. Время 0 не ограничено. Это максимальное время, используемое для доставки сообщения (включая повторные попытки). Ошибка доставки возникает при превышении количества повторных попыток или превышения времени ожидания сообщения.
requestTimeoutMs (Необязательно) Время ожидания подтверждения для выходного запроса в миллисекундах со значением по умолчанию 5000.
maxRetries (Необязательно) Число повторных попыток отправки неудачного сообщения со значением по умолчанию 2. Повторная попытка может привести к переупорядочению, если EnableIdempotence не задано как true.
authenticationMode (Необязательно) Режим, используемый при простой проверке подлинности и проверке подлинности уровня безопасности (SASL). Поддерживаемые значения: Gssapi, Plain (по умолчанию), ScramSha256, ScramSha512.
username (Необязательно) Имя пользователя для проверки подлинности SASL. Не поддерживается, если AuthenticationMode имеет значение Gssapi. Дополнительные сведения см. в разделе Подключения.
пароль (Необязательно) Пароль для проверки подлинности SASL. Не поддерживается, если AuthenticationMode имеет значение Gssapi. Дополнительные сведения см. в разделе Подключения.
protocol (Необязательно) Протокол безопасности, используемый при взаимодействии с брокерами. Поддерживаемые значения: plaintext (по умолчанию), ssl, sasl_plaintext, sasl_ssl.
sslCaLocation (Необязательно) Путь к файлу сертификата ЦС для проверки сертификата брокера.
sslCertificateLocation (Необязательно) Путь к сертификату клиента.
sslKeyLocation (Необязательно) Путь к закрытому ключу клиента (PEM), используемому для проверки подлинности.
sslKeyPassword (Необязательно) Пароль для сертификата клиента.

Использование

Оба типа ключей и значений поддерживаются при встроенной сериализации Avro и Protobuf.

Смещение, секция и метка времени для события создаются в ходе выполнения. В функции можно задать только значения и заголовки. Раздел задается в файле function.json.

Убедитесь, что у вас есть доступ к разделу Kafka, в котором вы пытаетесь сделать запись. Привязка настраивается с учетными данными доступа и подключения к разделу Kafka.

В плане "Премиум" необходимо включить мониторинг масштабирования среды выполнения для выходных данных Kafka, чтобы иметь возможность горизонтально увеличить масштаб до нескольких экземпляров. Дополнительные сведения см. в статье Включение масштабирования среды выполнения.

Полный набор поддерживаемых параметров host.json для триггера Kafka см. в статье Параметры host.json.

Связи

Все сведения о подключении, необходимые триггерам и привязкам, должны храниться в параметрах приложения, а не в определениях привязок в коде. Это верно для учетных данных, которые никогда не должны храниться в коде.

Внимание

Параметры учетных данных должны ссылаться на параметр приложения. Не прописывайте учетные данные в файлах кода или конфигурации. При локальном запуске используйте файл local.settings.json для учетных данных и не публикуйте его.

При подключении к управляемому кластеру Kafka, предоставляемому Confluent в Azure, убедитесь, что в триггере или привязке заданы следующие учетные данные проверки подлинности для среды Confluent Cloud:

Параметр Рекомендуемое значение Description
BrokerList BootstrapServer Параметр приложения с именем BootstrapServer содержит значение сервера начальной загрузки, найденное на странице параметров Confluent Cloud. Значение напоминает xyz-xyzxzy.westeurope.azure.confluent.cloud:9092.
Username ConfluentCloudUsername Параметр приложения с именем ConfluentCloudUsername содержит ключ доступа API, полученный от веб-сайта Confluent Cloud.
Пароль ConfluentCloudPassword Параметр приложения с именем ConfluentCloudPassword содержит секрет API, полученный от веб-сайта Confluent Cloud.

Строковые значения, используемые для этих параметров, должны присутствовать в качестве параметров приложения в Azure или коллекции Values в файле local.settings.json во время локальной разработки.

Необходимо также задать Protocol, AuthenticationMode и SslCaLocation в определениях привязок.

Следующие шаги