Привязка для вывода Apache Kafka для службы "Функции Azure"
Выходная привязка позволяет приложению Функции Azure записывать сообщения в раздел Kafka.
Внимание
Привязки Kafka доступны только для Функций в составе эластичного плана "Премиум" и плана "Выделенный (Служба приложений)". Они поддерживаются только в среде выполнения Функций версии 3.x и выше.
Пример
Использование привязки зависит от модальности C#, используемой в приложении-функции. Это может быть один из следующих вариантов:
Изолированная библиотека классов рабочих процессов, скомпилированная функция C# выполняется в процессе, изолированном от среды выполнения.
Используемые атрибуты зависят от конкретного поставщика событий.
В следующем примере имеется пользовательский тип возвращаемого значения MultipleOutputType
, который состоит из HTTP-ответа и выходных данных Kafka.
[Function("KafkaOutput")]
public static MultipleOutputType Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
string message = req.FunctionContext
.BindingContext
.BindingData["message"]
.ToString();
var response = req.CreateResponse(HttpStatusCode.OK);
return new MultipleOutputType()
{
Kevent = message,
HttpResponse = response
};
}
В классе MultipleOutputType
Kevent
— это переменная выходной привязки для привязки Kafka.
public class MultipleOutputType
{
[KafkaOutput("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)]
public string Kevent { get; set; }
public HttpResponseData HttpResponse { get; set; }
}
Чтобы отправить пакет событий, передайте массив строк в тип выходных данных, как показано в следующем примере:
[Function("KafkaOutputMany")]
public static MultipleOutputTypeForBatch Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
var response = req.CreateResponse(HttpStatusCode.OK);
string[] messages = new string[2];
messages[0] = "one";
messages[1] = "two";
return new MultipleOutputTypeForBatch()
{
Kevents = messages,
HttpResponse = response
};
}
Массив строк определяется как Kevents
свойство класса, для которого определена выходная привязка:
public class MultipleOutputTypeForBatch
{
[KafkaOutput("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)]
public string[] Kevents { get; set; }
public HttpResponseData HttpResponse { get; set; }
}
Следующая функция добавляет заголовки в выходные данные Kafka:
[Function("KafkaOutputWithHeaders")]
public static MultipleOutputType Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
string message = req.FunctionContext
.BindingContext
.BindingData["message"]
.ToString();
string kevent = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"dotnet-isolated\" }] }";
var response = req.CreateResponse(HttpStatusCode.OK);
return new MultipleOutputType()
{
Kevent = kevent,
HttpResponse = response
};
}
Полный набор рабочих примеров .NET см. в репозитории расширений Kafka.
Примечание.
Эквивалентный набор примеров TypeScript см. в репозитории расширений Kafka
Конкретные свойства файла function.json зависят от поставщика событий. В данных примерах это Confluent или Центры событий Azure. В следующих примерах показана выходная привязка Kafka для функции, которая активируется HTTP-запросом и отправляет данные из запроса в раздел Kafka.
Следующий файл function.json определяет триггер для конкретного поставщика в этих примерах:
{
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "req",
"methods": [
"get"
]
},
{
"type": "kafka",
"name": "outputKafkaMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username": "ConfluentCloudUsername",
"password": "ConfluentCloudPassword",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN",
"direction": "out"
},
{
"type": "http",
"direction": "out",
"name": "res"
}
]
}
Затем следующий код отправляет сообщение в раздел:
// This sample will create topic "topic" and send message to it.
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
context.log('JavaScript HTTP trigger function processed a request.');
const message = (req.query.message);
context.bindings.outputKafkaMessage = message;
context.res = {
// status: 200, /* Defaults to 200 */
body: 'Ok'
};
}
Следующий код отправляет несколько сообщений в виде массива в один и тот же раздел:
// This sample will create topic "topic" and send message to it.
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
context.log('JavaScript HTTP trigger function processed a request.');
context.bindings.outputKafkaMessages = ["one", "two"];
context.res = {
// status: 200, /* Defaults to 200 */
body: responseMessage
};
}
В следующем примере показано, как отправить сообщение о событии с заголовками в тот же раздел Kafka:
// This sample will create topic "topic" and send message to it.
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
context.log('JavaScript HTTP trigger function processed a request.');
const message = (req.query.message || (req.body && req.body.message));
const responseMessage = message
? "Message received: " + message + ". The message transfered to the kafka broker."
: "This HTTP triggered function executed successfully. Pass a message in the query string or in the request body for a personalized response.";
context.bindings.outputKafkaMessage = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"javascript\" }] }"
context.res = {
// status: 200, /* Defaults to 200 */
body: responseMessage
};
}
Полный набор рабочих примеров JavaScript см. в репозитории расширений Kafka.
Конкретные свойства файла function.json зависят от поставщика событий. В данных примерах это Confluent или Центры событий Azure. В следующих примерах показана выходная привязка Kafka для функции, которая активируется HTTP-запросом и отправляет данные из запроса в раздел Kafka.
Следующий файл function.json определяет триггер для конкретного поставщика в этих примерах:
{
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "Request",
"methods": [
"get"
]
},
{
"type": "kafka",
"name": "outputMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username" : "%ConfluentCloudUserName%",
"password" : "%ConfluentCloudPassword%",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN",
"direction": "out"
},
{
"type": "http",
"direction": "out",
"name": "Response"
}
]
}
Затем следующий код отправляет сообщение в раздел:
using namespace System.Net
# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)
# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."
# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
$message
Push-OutputBinding -Name outputMessage -Value ($message)
# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::OK
})
Следующий код отправляет несколько сообщений в виде массива в один и тот же раздел:
using namespace System.Net
# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)
# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."
$message = @("one", "two")
Push-OutputBinding -Name outputMessage -Value ($message)
# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::OK
})
В следующем примере показано, как отправить сообщение о событии с заголовками в тот же раздел Kafka:
using namespace System.Net
# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)
# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."
# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
if (-not $message) {
$message = $Request.Body.Message
}
$kevent = @{
Offset = 364
Partition = 0
Topic = "kafkaeventhubtest1"
Timestamp = "2022-04-09T03:20:06.591Z"
Value = $message
Headers= @(@{
Key= "test"
Value= "powershell"
}
)
}
Push-OutputBinding -Name Message -Value $kevent
# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::OK
Body = 'ok'
})
Полный набор рабочих примеров PowerShell см. в репозитории расширений Kafka.
Конкретные свойства файла function.json зависят от поставщика событий. В данных примерах это Confluent или Центры событий Azure. В следующих примерах показана выходная привязка Kafka для функции, которая активируется HTTP-запросом и отправляет данные из запроса в раздел Kafka.
Следующий файл function.json определяет триггер для конкретного поставщика в этих примерах:
{
"scriptFile": "main.py",
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "req",
"methods": [
"get"
]
},
{
"type": "kafka",
"direction": "out",
"name": "outputMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username": "%ConfluentCloudUserName%",
"password": "%ConfluentCloudPassword%",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN"
},
{
"type": "http",
"direction": "out",
"name": "$return"
}
]
}
Затем следующий код отправляет сообщение в раздел:
import logging
import azure.functions as func
def main(req: func.HttpRequest, outputMessage: func.Out[str]) -> func.HttpResponse:
input_msg = req.params.get('message')
outputMessage.set(input_msg)
return 'OK'
Следующий код отправляет несколько сообщений в виде массива в один и тот же раздел:
import logging
import typing
from azure.functions import Out, HttpRequest, HttpResponse
import json
def main(req: HttpRequest, outputMessage: Out[str] ) -> HttpResponse:
outputMessage.set(['one', 'two'])
return 'OK'
В следующем примере показано, как отправить сообщение о событии с заголовками в тот же раздел Kafka:
import logging
import azure.functions as func
import json
def main(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
message = req.params.get('message')
kevent = { "Offset":364,"Partition":0,"Topic":"kafkaeventhubtest1","Timestamp":"2022-04-09T03:20:06.591Z", "Value": message, "Headers": [{ "Key": "test", "Value": "python" }] }
out.set(json.dumps(kevent))
return 'OK'
Полный набор рабочих примеров Python см. в репозитории расширений Kafka.
Заметки, используемые для настройки выходной привязки, зависят от конкретного поставщика событий.
Следующая функция отправляет сообщение в раздел Kafka.
@FunctionName("KafkaOutput")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<String> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
// Parse query parameter
String query = request.getQueryParameters().get("message");
String message = request.getBody().orElse(query);
context.getLogger().info("Message:" + message);
output.setValue(message);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
В следующем примере показано, как отправить несколько сообщений в раздел Kafka.
@FunctionName("KafkaOutputMany")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<String[]> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
String[] messages = new String[2];
messages[0] = "one";
messages[1] = "two";
output.setValue(messages);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}
В этом примере параметр выходной привязки изменяется на массив строк.
Последний пример используется для следующих классов KafkaEntity
и KafkaHeader
:
public class KafkaEntity {
public int Offset;
public int Partition;
public String Timestamp;
public String Topic;
public String Value;
public KafkaHeaders Headers[];
public KafkaEntity(int Offset, int Partition, String Topic, String Timestamp, String Value,KafkaHeaders[] headers) {
this.Offset = Offset;
this.Partition = Partition;
this.Topic = Topic;
this.Timestamp = Timestamp;
this.Value = Value;
this.Headers = headers;
}
public class KafkaHeaders{
public String Key;
public String Value;
public KafkaHeaders(String key, String value) {
this.Key = key;
this.Value = value;
}
В следующем примере функция отправляет сообщение с заголовками в раздел Kafka.
@FunctionName("KafkaOutputWithHeaders")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<KafkaEntity> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
// Parse query parameter
String query = request.getQueryParameters().get("message");
String message = request.getBody().orElse(query);
KafkaHeaders[] headers = new KafkaHeaders[1];
headers[0] = new KafkaHeaders("test", "java");
KafkaEntity kevent = new KafkaEntity(364, 0, "topic", "2022-04-09T03:20:06.591Z", message, headers);
output.setValue(kevent);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}
Полный набор рабочих примеров Java для Confluent см. в репозитории расширений Kafka.
Атрибуты
Библиотеки C# в процессе и изолированном рабочем процессе используют Kafka
атрибут для определения триггера функции.
В следующей таблице описаны свойства, которые можно задать с помощью этого атрибута:
Параметр | Описание |
---|---|
BrokerList | (Обязательно) Список брокеров Kafka, на которые отправляются выходные данные. Дополнительные сведения см. в разделе Подключения. |
Раздел | (Обязательно) Раздел, в который направляются выходные данные. |
AvroSchema | (Необязательно) Схема универсальной записи при использовании протокола Avro. |
MaxMessageBytes | (Необязательно) Максимальный размер отправляемого выходного сообщения (в МБ) со значением 1 по умолчанию. |
BatchSize | (Необязательно) Максимальное количество сообщений, пакетированных в одном наборе сообщений, со значением 10000 по умолчанию. |
EnableIdempotence | (Необязательно) Если задано значение true , гарантирует, что сообщения успешно создаются ровно один раз и в исходном порядке создания со значением по умолчанию false |
MessageTimeoutMs | (Необязательно) Время ожидания локального сообщения в миллисекундах. Это значение применяется только локально и ограничивает время ожидания сообщением успешной доставки с заданным по умолчанию 300000 . Время 0 не ограничено. Это значение — максимальное время, используемое для доставки сообщения (включая повторные попытки). Ошибка доставки возникает при превышении количества повторных попыток или превышения времени ожидания сообщения. |
RequestTimeoutMs | (Необязательно) Время ожидания подтверждения для выходного запроса в миллисекундах со значением по умолчанию 5000 . |
MaxRetries | (Необязательно) Число повторных попыток отправки неудачного сообщения со значением по умолчанию 2 . Повторная попытка может привести к переупорядочению, если EnableIdempotence не задано как true . |
AuthenticationMode | (Необязательно) Режим, используемый при простой проверке подлинности и проверке подлинности уровня безопасности (SASL). Поддерживаемые значения: Gssapi , Plain (по умолчанию), ScramSha256 , ScramSha512 . |
Username | (Необязательно) Имя пользователя для проверки подлинности SASL. Не поддерживается, если AuthenticationMode имеет значение Gssapi . Дополнительные сведения см. в разделе Подключения. |
Пароль | (Необязательно) Пароль для проверки подлинности SASL. Не поддерживается, если AuthenticationMode имеет значение Gssapi . Дополнительные сведения см. в разделе Подключения. |
Протокол | (Необязательно) Протокол безопасности, используемый при взаимодействии с брокерами. Поддерживаемые значения: plaintext (по умолчанию), ssl , sasl_plaintext , sasl_ssl . |
SslCaLocation | (Необязательно) Путь к файлу сертификата ЦС для проверки сертификата брокера. |
SslCertificateLocation | (Необязательно) Путь к сертификату клиента. |
SslKeyLocation | (Необязательно) Путь к закрытому ключу клиента (PEM), используемому для проверки подлинности. |
SslKeyPassword | (Необязательно) Пароль для сертификата клиента. |
Заметки
Заметка KafkaOutput
позволяет создать функцию, которая записывает данные в определенный раздел. Поддерживаемые варианты включают следующие элементы:
Элемент | Description |
---|---|
name | Имя переменной, представляющей данные, полученные через брокер, в коде функции. |
brokerList | (Обязательно) Список брокеров Kafka, на которые отправляются выходные данные. Дополнительные сведения см. в разделе Подключения. |
topic | (Обязательно) Раздел, в который направляются выходные данные. |
dataType | Определяет, как Функции обрабатывают значение параметра. По умолчанию полученное значение представляет собой строку, и Функции пытаются десериализовать ее до объекта POJO. При string входные данные обрабатываются просто как строка. При binary сообщение приходит в формате двоичных данных и Функции пытаются десериализовать его до фактического типа параметра byte[]. |
avroSchema | (Необязательно) Схема универсальной записи при использовании протокола Avro. (В настоящее время не поддерживается для Java.) |
maxMessageBytes | (Необязательно) Максимальный размер отправляемого выходного сообщения (в МБ) со значением 1 по умолчанию. |
batchSize | (Необязательно) Максимальное количество сообщений, пакетированных в одном наборе сообщений, со значением 10000 по умолчанию. |
EnableIdempotence | (Необязательно) Если задано значение true , гарантирует, что сообщения успешно создаются ровно один раз и в исходном порядке создания со значением по умолчанию false |
MessageTimeoutMs | (Необязательно) Время ожидания локального сообщения в миллисекундах. Это значение применяется только локально и ограничивает время ожидания сообщением успешной доставки с заданным по умолчанию 300000 . Время 0 не ограничено. Это максимальное время, используемое для доставки сообщения (включая повторные попытки). Ошибка доставки возникает при превышении количества повторных попыток или превышения времени ожидания сообщения. |
requestTimeoutMs | (Необязательно) Время ожидания подтверждения для выходного запроса в миллисекундах со значением по умолчанию 5000 . |
maxRetries | (Необязательно) Число повторных попыток отправки неудачного сообщения со значением по умолчанию 2 . Повторная попытка может привести к переупорядочению, если EnableIdempotence не задано как true . |
authenticationMode | (Необязательно) Режим, используемый при простой проверке подлинности и проверке подлинности уровня безопасности (SASL). Поддерживаемые значения: Gssapi , Plain (по умолчанию), ScramSha256 , ScramSha512 . |
username | (Необязательно) Имя пользователя для проверки подлинности SASL. Не поддерживается, если AuthenticationMode имеет значение Gssapi . Дополнительные сведения см. в разделе Подключения. |
пароль | (Необязательно) Пароль для проверки подлинности SASL. Не поддерживается, если AuthenticationMode имеет значение Gssapi . Дополнительные сведения см. в разделе Подключения. |
protocol | (Необязательно) Протокол безопасности, используемый при взаимодействии с брокерами. Поддерживаемые значения: plaintext (по умолчанию), ssl , sasl_plaintext , sasl_ssl . |
sslCaLocation | (Необязательно) Путь к файлу сертификата ЦС для проверки сертификата брокера. |
sslCertificateLocation | (Необязательно) Путь к сертификату клиента. |
sslKeyLocation | (Необязательно) Путь к закрытому ключу клиента (PEM), используемому для проверки подлинности. |
sslKeyPassword | (Необязательно) Пароль для сертификата клиента. |
Настройка
В следующей таблице описываются свойства конфигурации привязки, которые задаются в файле function.json.
Свойство в function.json | Описание |
---|---|
type | Должен иметь значениеkafka . |
direction | Должен иметь значениеout . |
name | Имя переменной, представляющей данные, полученные через брокер, в коде функции. |
brokerList | (Обязательно) Список брокеров Kafka, на которые отправляются выходные данные. Дополнительные сведения см. в разделе Подключения. |
topic | (Обязательно) Раздел, в который направляются выходные данные. |
avroSchema | (Необязательно) Схема универсальной записи при использовании протокола Avro. |
maxMessageBytes | (Необязательно) Максимальный размер отправляемого выходного сообщения (в МБ) со значением 1 по умолчанию. |
batchSize | (Необязательно) Максимальное количество сообщений, пакетированных в одном наборе сообщений, со значением 10000 по умолчанию. |
EnableIdempotence | (Необязательно) Если задано значение true , гарантирует, что сообщения успешно создаются ровно один раз и в исходном порядке создания со значением по умолчанию false |
MessageTimeoutMs | (Необязательно) Время ожидания локального сообщения в миллисекундах. Это значение применяется только локально и ограничивает время ожидания сообщением успешной доставки с заданным по умолчанию 300000 . Время 0 не ограничено. Это максимальное время, используемое для доставки сообщения (включая повторные попытки). Ошибка доставки возникает при превышении количества повторных попыток или превышения времени ожидания сообщения. |
requestTimeoutMs | (Необязательно) Время ожидания подтверждения для выходного запроса в миллисекундах со значением по умолчанию 5000 . |
maxRetries | (Необязательно) Число повторных попыток отправки неудачного сообщения со значением по умолчанию 2 . Повторная попытка может привести к переупорядочению, если EnableIdempotence не задано как true . |
authenticationMode | (Необязательно) Режим, используемый при простой проверке подлинности и проверке подлинности уровня безопасности (SASL). Поддерживаемые значения: Gssapi , Plain (по умолчанию), ScramSha256 , ScramSha512 . |
username | (Необязательно) Имя пользователя для проверки подлинности SASL. Не поддерживается, если AuthenticationMode имеет значение Gssapi . Дополнительные сведения см. в разделе Подключения. |
пароль | (Необязательно) Пароль для проверки подлинности SASL. Не поддерживается, если AuthenticationMode имеет значение Gssapi . Дополнительные сведения см. в разделе Подключения. |
protocol | (Необязательно) Протокол безопасности, используемый при взаимодействии с брокерами. Поддерживаемые значения: plaintext (по умолчанию), ssl , sasl_plaintext , sasl_ssl . |
sslCaLocation | (Необязательно) Путь к файлу сертификата ЦС для проверки сертификата брокера. |
sslCertificateLocation | (Необязательно) Путь к сертификату клиента. |
sslKeyLocation | (Необязательно) Путь к закрытому ключу клиента (PEM), используемому для проверки подлинности. |
sslKeyPassword | (Необязательно) Пароль для сертификата клиента. |
Использование
Смещение, секция и метка времени для события создаются в ходе выполнения. В функции можно задать только значения и заголовки. Раздел задается в файле function.json.
Убедитесь, что у вас есть доступ к разделу Kafka, в котором вы пытаетесь сделать запись. Привязка настраивается с учетными данными доступа и подключения к разделу Kafka.
В плане "Премиум" необходимо включить мониторинг масштабирования среды выполнения для выходных данных Kafka, чтобы иметь возможность горизонтально увеличить масштаб до нескольких экземпляров. Дополнительные сведения см. в статье Включение масштабирования среды выполнения.
Полный набор поддерживаемых параметров host.json для триггера Kafka см. в статье Параметры host.json.
Связи
Все сведения о подключении, необходимые триггерам и привязкам, должны храниться в параметрах приложения, а не в определениях привязок в коде. Это верно для учетных данных, которые никогда не должны храниться в коде.
Внимание
Параметры учетных данных должны ссылаться на параметр приложения. Не прописывайте учетные данные в файлах кода или конфигурации. При локальном запуске используйте файл local.settings.json для учетных данных и не публикуйте его.
При подключении к управляемому кластеру Kafka, предоставляемому Confluent в Azure, убедитесь, что в триггере или привязке заданы следующие учетные данные проверки подлинности для среды Confluent Cloud:
Параметр | Рекомендуемое значение | Description |
---|---|---|
BrokerList | BootstrapServer |
Параметр приложения с именем BootstrapServer содержит значение сервера начальной загрузки, найденное на странице параметров Confluent Cloud. Значение напоминает xyz-xyzxzy.westeurope.azure.confluent.cloud:9092 . |
Username | ConfluentCloudUsername |
Параметр приложения с именем ConfluentCloudUsername содержит ключ доступа API, полученный от веб-сайта Confluent Cloud. |
Пароль | ConfluentCloudPassword |
Параметр приложения с именем ConfluentCloudPassword содержит секрет API, полученный от веб-сайта Confluent Cloud. |
Строковые значения, используемые для этих параметров, должны присутствовать в качестве параметров приложения в Azure или коллекции Values
в файле local.settings.json во время локальной разработки.
Необходимо также задать Protocol
, AuthenticationMode
и SslCaLocation
в определениях привязок.