Powiązanie wyjściowe platformy Apache Kafka dla usługi Azure Functions
Powiązanie wyjściowe umożliwia aplikacji usługi Azure Functions zapisywanie komunikatów w temacie platformy Kafka.
Ważne
Powiązania platformy Kafka są dostępne tylko dla funkcji w ramach planu Elastic Premium i dedykowanego (App Service). Są one obsługiwane tylko w wersji 3.x i nowszej środowiska uruchomieniowego usługi Functions.
Przykład
Użycie powiązania zależy od modalności języka C# używanej w aplikacji funkcji, co może być jednym z następujących elementów:
Izolowana biblioteka klas procesów roboczych skompilowana funkcja języka C# jest uruchamiana w procesie odizolowanym od środowiska uruchomieniowego.
Używane atrybuty zależą od określonego dostawcy zdarzeń.
Poniższy przykład ma niestandardowy typ zwracany, czyli MultipleOutputType
, który składa się z odpowiedzi HTTP i danych wyjściowych platformy Kafka.
[Function("KafkaOutput")]
public static MultipleOutputType Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
string message = req.FunctionContext
.BindingContext
.BindingData["message"]
.ToString();
var response = req.CreateResponse(HttpStatusCode.OK);
return new MultipleOutputType()
{
Kevent = message,
HttpResponse = response
};
}
W klasie MultipleOutputType
Kevent
jest zmienną powiązania wyjściowego dla powiązania platformy Kafka.
public class MultipleOutputType
{
[KafkaOutput("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)]
public string Kevent { get; set; }
public HttpResponseData HttpResponse { get; set; }
}
Aby wysłać partię zdarzeń, przekaż tablicę ciągów do typu danych wyjściowych, jak pokazano w poniższym przykładzie:
[Function("KafkaOutputMany")]
public static MultipleOutputTypeForBatch Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
var response = req.CreateResponse(HttpStatusCode.OK);
string[] messages = new string[2];
messages[0] = "one";
messages[1] = "two";
return new MultipleOutputTypeForBatch()
{
Kevents = messages,
HttpResponse = response
};
}
Tablica ciągów jest definiowana jako Kevents
właściwość klasy, na której zdefiniowano powiązanie wyjściowe:
public class MultipleOutputTypeForBatch
{
[KafkaOutput("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)]
public string[] Kevents { get; set; }
public HttpResponseData HttpResponse { get; set; }
}
Następująca funkcja dodaje nagłówki do danych wyjściowych platformy Kafka:
[Function("KafkaOutputWithHeaders")]
public static MultipleOutputType Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
string message = req.FunctionContext
.BindingContext
.BindingData["message"]
.ToString();
string kevent = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"dotnet-isolated\" }] }";
var response = req.CreateResponse(HttpStatusCode.OK);
return new MultipleOutputType()
{
Kevent = kevent,
HttpResponse = response
};
}
Pełny zestaw działających przykładów platformy .NET można znaleźć w repozytorium rozszerzeń platformy Kafka.
Uwaga
Aby zapoznać się z równoważnym zestawem przykładów języka TypeScript, zobacz repozytorium rozszerzeń platformy Kafka
Określone właściwości pliku function.json zależą od dostawcy zdarzeń, który w tych przykładach to Confluent lub Azure Event Hubs. W poniższych przykładach pokazano powiązanie danych wyjściowych platformy Kafka dla funkcji wyzwalanej przez żądanie HTTP i wysyłanie danych z żądania do tematu platformy Kafka.
W poniższych function.json zdefiniowano wyzwalacz dla określonego dostawcy w następujących przykładach:
{
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "req",
"methods": [
"get"
]
},
{
"type": "kafka",
"name": "outputKafkaMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username": "ConfluentCloudUsername",
"password": "ConfluentCloudPassword",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN",
"direction": "out"
},
{
"type": "http",
"direction": "out",
"name": "res"
}
]
}
Poniższy kod wysyła następnie komunikat do tematu:
// This sample will create topic "topic" and send message to it.
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
context.log('JavaScript HTTP trigger function processed a request.');
const message = (req.query.message);
context.bindings.outputKafkaMessage = message;
context.res = {
// status: 200, /* Defaults to 200 */
body: 'Ok'
};
}
Poniższy kod wysyła wiele komunikatów jako tablicę do tego samego tematu:
// This sample will create topic "topic" and send message to it.
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
context.log('JavaScript HTTP trigger function processed a request.');
context.bindings.outputKafkaMessages = ["one", "two"];
context.res = {
// status: 200, /* Defaults to 200 */
body: responseMessage
};
}
W poniższym przykładzie pokazano, jak wysłać komunikat zdarzenia z nagłówkami do tego samego tematu platformy Kafka:
// This sample will create topic "topic" and send message to it.
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
context.log('JavaScript HTTP trigger function processed a request.');
const message = (req.query.message || (req.body && req.body.message));
const responseMessage = message
? "Message received: " + message + ". The message transfered to the kafka broker."
: "This HTTP triggered function executed successfully. Pass a message in the query string or in the request body for a personalized response.";
context.bindings.outputKafkaMessage = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"javascript\" }] }"
context.res = {
// status: 200, /* Defaults to 200 */
body: responseMessage
};
}
Pełny zestaw działających przykładów języka JavaScript można znaleźć w repozytorium rozszerzeń platformy Kafka.
Określone właściwości pliku function.json zależą od dostawcy zdarzeń, który w tych przykładach to Confluent lub Azure Event Hubs. W poniższych przykładach pokazano powiązanie danych wyjściowych platformy Kafka dla funkcji wyzwalanej przez żądanie HTTP i wysyłanie danych z żądania do tematu platformy Kafka.
W poniższych function.json zdefiniowano wyzwalacz dla określonego dostawcy w następujących przykładach:
{
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "Request",
"methods": [
"get"
]
},
{
"type": "kafka",
"name": "outputMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username" : "%ConfluentCloudUserName%",
"password" : "%ConfluentCloudPassword%",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN",
"direction": "out"
},
{
"type": "http",
"direction": "out",
"name": "Response"
}
]
}
Poniższy kod wysyła następnie komunikat do tematu:
using namespace System.Net
# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)
# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."
# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
$message
Push-OutputBinding -Name outputMessage -Value ($message)
# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::OK
})
Poniższy kod wysyła wiele komunikatów jako tablicę do tego samego tematu:
using namespace System.Net
# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)
# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."
$message = @("one", "two")
Push-OutputBinding -Name outputMessage -Value ($message)
# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::OK
})
W poniższym przykładzie pokazano, jak wysłać komunikat zdarzenia z nagłówkami do tego samego tematu platformy Kafka:
using namespace System.Net
# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)
# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."
# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
if (-not $message) {
$message = $Request.Body.Message
}
$kevent = @{
Offset = 364
Partition = 0
Topic = "kafkaeventhubtest1"
Timestamp = "2022-04-09T03:20:06.591Z"
Value = $message
Headers= @(@{
Key= "test"
Value= "powershell"
}
)
}
Push-OutputBinding -Name Message -Value $kevent
# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::OK
Body = 'ok'
})
Pełny zestaw działających przykładów programu PowerShell można znaleźć w repozytorium rozszerzeń platformy Kafka.
Określone właściwości pliku function.json zależą od dostawcy zdarzeń, który w tych przykładach to Confluent lub Azure Event Hubs. W poniższych przykładach pokazano powiązanie danych wyjściowych platformy Kafka dla funkcji wyzwalanej przez żądanie HTTP i wysyłanie danych z żądania do tematu platformy Kafka.
W poniższych function.json zdefiniowano wyzwalacz dla określonego dostawcy w następujących przykładach:
{
"scriptFile": "main.py",
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "req",
"methods": [
"get"
]
},
{
"type": "kafka",
"direction": "out",
"name": "outputMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username": "%ConfluentCloudUserName%",
"password": "%ConfluentCloudPassword%",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN"
},
{
"type": "http",
"direction": "out",
"name": "$return"
}
]
}
Poniższy kod wysyła następnie komunikat do tematu:
import logging
import azure.functions as func
def main(req: func.HttpRequest, outputMessage: func.Out[str]) -> func.HttpResponse:
input_msg = req.params.get('message')
outputMessage.set(input_msg)
return 'OK'
Poniższy kod wysyła wiele komunikatów jako tablicę do tego samego tematu:
import logging
import typing
from azure.functions import Out, HttpRequest, HttpResponse
import json
def main(req: HttpRequest, outputMessage: Out[str] ) -> HttpResponse:
outputMessage.set(['one', 'two'])
return 'OK'
W poniższym przykładzie pokazano, jak wysłać komunikat zdarzenia z nagłówkami do tego samego tematu platformy Kafka:
import logging
import azure.functions as func
import json
def main(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
message = req.params.get('message')
kevent = { "Offset":364,"Partition":0,"Topic":"kafkaeventhubtest1","Timestamp":"2022-04-09T03:20:06.591Z", "Value": message, "Headers": [{ "Key": "test", "Value": "python" }] }
out.set(json.dumps(kevent))
return 'OK'
Pełny zestaw działających przykładów języka Python można znaleźć w repozytorium rozszerzeń platformy Kafka.
Adnotacje używane do konfigurowania powiązania wyjściowego zależą od określonego dostawcy zdarzeń.
Poniższa funkcja wysyła komunikat do tematu platformy Kafka.
@FunctionName("KafkaOutput")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<String> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
// Parse query parameter
String query = request.getQueryParameters().get("message");
String message = request.getBody().orElse(query);
context.getLogger().info("Message:" + message);
output.setValue(message);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
W poniższym przykładzie pokazano, jak wysyłać wiele komunikatów do tematu platformy Kafka.
@FunctionName("KafkaOutputMany")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<String[]> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
String[] messages = new String[2];
messages[0] = "one";
messages[1] = "two";
output.setValue(messages);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}
W tym przykładzie parametr powiązania wyjściowego został zmieniony na tablicę ciągów.
W ostatnim przykładzie użyto następujących KafkaEntity
klas i KafkaHeader
:
public class KafkaEntity {
public int Offset;
public int Partition;
public String Timestamp;
public String Topic;
public String Value;
public KafkaHeaders Headers[];
public KafkaEntity(int Offset, int Partition, String Topic, String Timestamp, String Value,KafkaHeaders[] headers) {
this.Offset = Offset;
this.Partition = Partition;
this.Topic = Topic;
this.Timestamp = Timestamp;
this.Value = Value;
this.Headers = headers;
}
public class KafkaHeaders{
public String Key;
public String Value;
public KafkaHeaders(String key, String value) {
this.Key = key;
this.Value = value;
}
Poniższa przykładowa funkcja wysyła komunikat z nagłówkami do tematu platformy Kafka.
@FunctionName("KafkaOutputWithHeaders")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<KafkaEntity> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
// Parse query parameter
String query = request.getQueryParameters().get("message");
String message = request.getBody().orElse(query);
KafkaHeaders[] headers = new KafkaHeaders[1];
headers[0] = new KafkaHeaders("test", "java");
KafkaEntity kevent = new KafkaEntity(364, 0, "topic", "2022-04-09T03:20:06.591Z", message, headers);
output.setValue(kevent);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}
Pełny zestaw działających przykładów języka Java dla platformy Confluent można znaleźć w repozytorium rozszerzeń platformy Kafka.
Atrybuty
Biblioteki języka C# procesu roboczego w procesie przetwarzania procesów przetwarzania procesów procesów przetwarzania w procesie przetwarzania izolowanego używają atrybutu Kafka
do zdefiniowania wyzwalacza funkcji.
W poniższej tabeli opisano właściwości, które można ustawić przy użyciu tego atrybutu:
Parametr | Opis |
---|---|
Lista brokerów | (Wymagane) Lista brokerów platformy Kafka, do których są wysyłane dane wyjściowe. Aby uzyskać więcej informacji, zobacz Połączenia . |
Temat | (Wymagane) Temat, do którego są wysyłane dane wyjściowe. |
AvroSchema | (Opcjonalnie) Schemat rekordu ogólnego w przypadku korzystania z protokołu Avro. |
MaxMessageBytes | (Opcjonalnie) Maksymalny rozmiar wysyłanego komunikatu wyjściowego (w MB) z wartością 1 domyślną . |
BatchSize | (Opcjonalnie) Maksymalna liczba komunikatów wsadowych w jednym zestawie komunikatów z wartością 10000 domyślną . |
EnableIdempotence | (Opcjonalnie) W przypadku ustawienia opcji true gwarantuje, że komunikaty są generowane dokładnie raz i w oryginalnej kolejności produkcji z wartością domyślną false |
MessageTimeoutMs | (Opcjonalnie) Limit czasu komunikatu lokalnego( w milisekundach). Ta wartość jest wymuszana tylko lokalnie i ogranicza czas oczekiwania wygenerowanego komunikatu na pomyślne dostarczenie z domyślną wartością 300000 . Czas 0 jest nieskończony. Ta wartość to maksymalny czas używany do dostarczenia komunikatu (w tym ponownych prób). Błąd dostarczania występuje, gdy liczba ponownych prób lub przekroczenie limitu czasu komunikatu. |
RequestTimeoutMs | (Opcjonalnie) Limit czasu potwierdzenia żądania wyjściowego (w milisekundach) z wartością domyślną 5000 . |
Maksymalna liczba ponownych prób | (Opcjonalnie) Liczba ponownych prób wysłania komunikatu zakończonego niepowodzeniem z wartością domyślną 2 . Ponawianie próby może spowodować zmiana kolejności, chyba że EnableIdempotence ustawiono wartość true . |
AuthenticationMode | (Opcjonalnie) Tryb uwierzytelniania podczas korzystania z uwierzytelniania prostego i warstwy zabezpieczeń (SASL). Obsługiwane wartości to Gssapi , Plain (wartość domyślna), ScramSha256 , ScramSha512 . |
Nazwa użytkownika | (Opcjonalnie) Nazwa użytkownika uwierzytelniania SASL. Nieobsługiwane, gdy AuthenticationMode ma wartość Gssapi . Aby uzyskać więcej informacji, zobacz Połączenia . |
Hasło | (Opcjonalnie) Hasło do uwierzytelniania SASL. Nieobsługiwane, gdy AuthenticationMode ma wartość Gssapi . Aby uzyskać więcej informacji, zobacz Połączenia . |
Protokół | (Opcjonalnie) Protokół zabezpieczeń używany podczas komunikacji z brokerami. Obsługiwane wartości to plaintext (wartość domyślna), ssl , sasl_plaintext , sasl_ssl . |
SslCaLocation | (Opcjonalnie) Ścieżka do pliku certyfikatu urzędu certyfikacji na potrzeby weryfikowania certyfikatu brokera. |
SslCertificateLocation | (Opcjonalnie) Ścieżka do certyfikatu klienta. |
SslKeyLocation | (Opcjonalnie) Ścieżka do klucza prywatnego klienta (PEM) używanego do uwierzytelniania. |
SslKeyPassword | (Opcjonalnie) Hasło do certyfikatu klienta. |
Adnotacje
Adnotacja KafkaOutput
umożliwia utworzenie funkcji, która zapisuje w określonym temacie. Obsługiwane opcje obejmują następujące elementy:
Element | opis |
---|---|
name | Nazwa zmiennej reprezentującej dane obsługiwane przez brokera w kodzie funkcji. |
brokerList | (Wymagane) Lista brokerów platformy Kafka, do których są wysyłane dane wyjściowe. Aby uzyskać więcej informacji, zobacz Połączenia . |
topic | (Wymagane) Temat, do którego są wysyłane dane wyjściowe. |
Datatype | Definiuje sposób obsługi wartości parametru przez funkcję Functions. Domyślnie wartość jest uzyskiwana jako ciąg, a usługa Functions próbuje wykonać deserializacji ciągu do rzeczywistego zwykłego obiektu Java (POJO). Gdy string element wejściowy jest traktowany jako tylko ciąg. Gdy binary komunikat zostanie odebrany jako dane binarne, a usługa Functions próbuje wykonać deserializacji go do rzeczywistego bajtu typu parametru[]. |
avroSchema | (Opcjonalnie) Schemat rekordu ogólnego w przypadku korzystania z protokołu Avro. (Obecnie nieobsługiwane dla języka Java). |
maxMessageBytes | (Opcjonalnie) Maksymalny rozmiar wysyłanego komunikatu wyjściowego (w MB) z wartością 1 domyślną . |
batchSize | (Opcjonalnie) Maksymalna liczba komunikatów wsadowych w jednym zestawie komunikatów z wartością 10000 domyślną . |
enableIdempotence | (Opcjonalnie) W przypadku ustawienia opcji true gwarantuje, że komunikaty są generowane dokładnie raz i w oryginalnej kolejności produkcji z wartością domyślną false |
messageTimeoutMs | (Opcjonalnie) Limit czasu komunikatu lokalnego( w milisekundach). Ta wartość jest wymuszana tylko lokalnie i ogranicza czas oczekiwania wygenerowanego komunikatu na pomyślne dostarczenie z domyślną wartością 300000 . Czas 0 jest nieskończony. Jest to maksymalny czas używany do dostarczania komunikatu (w tym ponownych prób). Błąd dostarczania występuje, gdy liczba ponownych prób lub przekroczenie limitu czasu komunikatu. |
requestTimeoutMs | (Opcjonalnie) Limit czasu potwierdzenia żądania wyjściowego (w milisekundach) z wartością domyślną 5000 . |
maxRetries | (Opcjonalnie) Liczba ponownych prób wysłania komunikatu zakończonego niepowodzeniem z wartością domyślną 2 . Ponawianie próby może spowodować zmiana kolejności, chyba że EnableIdempotence ustawiono wartość true . |
authenticationMode | (Opcjonalnie) Tryb uwierzytelniania podczas korzystania z uwierzytelniania prostego i warstwy zabezpieczeń (SASL). Obsługiwane wartości to Gssapi , Plain (wartość domyślna), ScramSha256 , ScramSha512 . |
nazwa użytkownika | (Opcjonalnie) Nazwa użytkownika uwierzytelniania SASL. Nieobsługiwane, gdy AuthenticationMode ma wartość Gssapi . Aby uzyskać więcej informacji, zobacz Połączenia . |
hasło | (Opcjonalnie) Hasło do uwierzytelniania SASL. Nieobsługiwane, gdy AuthenticationMode ma wartość Gssapi . Aby uzyskać więcej informacji, zobacz Połączenia . |
protokół | (Opcjonalnie) Protokół zabezpieczeń używany podczas komunikacji z brokerami. Obsługiwane wartości to plaintext (wartość domyślna), ssl , sasl_plaintext , sasl_ssl . |
sslCaLocation | (Opcjonalnie) Ścieżka do pliku certyfikatu urzędu certyfikacji na potrzeby weryfikowania certyfikatu brokera. |
sslCertificateLocation | (Opcjonalnie) Ścieżka do certyfikatu klienta. |
sslKeyLocation | (Opcjonalnie) Ścieżka do klucza prywatnego klienta (PEM) używanego do uwierzytelniania. |
sslKeyPassword | (Opcjonalnie) Hasło do certyfikatu klienta. |
Konfigurowanie
W poniższej tabeli opisano właściwości konfiguracji powiązania ustawione w pliku function.json .
właściwość function.json | opis |
---|---|
type | Musi być ustawiona wartość kafka . |
direction | Musi być ustawiona wartość out . |
name | Nazwa zmiennej reprezentującej dane obsługiwane przez brokera w kodzie funkcji. |
brokerList | (Wymagane) Lista brokerów platformy Kafka, do których są wysyłane dane wyjściowe. Aby uzyskać więcej informacji, zobacz Połączenia . |
topic | (Wymagane) Temat, do którego są wysyłane dane wyjściowe. |
avroSchema | (Opcjonalnie) Schemat rekordu ogólnego w przypadku korzystania z protokołu Avro. |
maxMessageBytes | (Opcjonalnie) Maksymalny rozmiar wysyłanego komunikatu wyjściowego (w MB) z wartością 1 domyślną . |
batchSize | (Opcjonalnie) Maksymalna liczba komunikatów wsadowych w jednym zestawie komunikatów z wartością 10000 domyślną . |
enableIdempotence | (Opcjonalnie) W przypadku ustawienia opcji true gwarantuje, że komunikaty są generowane dokładnie raz i w oryginalnej kolejności produkcji z wartością domyślną false |
messageTimeoutMs | (Opcjonalnie) Limit czasu komunikatu lokalnego( w milisekundach). Ta wartość jest wymuszana tylko lokalnie i ogranicza czas oczekiwania wygenerowanego komunikatu na pomyślne dostarczenie z domyślną wartością 300000 . Czas 0 jest nieskończony. Jest to maksymalny czas używany do dostarczania komunikatu (w tym ponownych prób). Błąd dostarczania występuje, gdy liczba ponownych prób lub przekroczenie limitu czasu komunikatu. |
requestTimeoutMs | (Opcjonalnie) Limit czasu potwierdzenia żądania wyjściowego (w milisekundach) z wartością domyślną 5000 . |
maxRetries | (Opcjonalnie) Liczba ponownych prób wysłania komunikatu zakończonego niepowodzeniem z wartością domyślną 2 . Ponawianie próby może spowodować zmiana kolejności, chyba że EnableIdempotence ustawiono wartość true . |
authenticationMode | (Opcjonalnie) Tryb uwierzytelniania podczas korzystania z uwierzytelniania prostego i warstwy zabezpieczeń (SASL). Obsługiwane wartości to Gssapi , Plain (wartość domyślna), ScramSha256 , ScramSha512 . |
nazwa użytkownika | (Opcjonalnie) Nazwa użytkownika uwierzytelniania SASL. Nieobsługiwane, gdy AuthenticationMode ma wartość Gssapi . Aby uzyskać więcej informacji, zobacz Połączenia . |
hasło | (Opcjonalnie) Hasło do uwierzytelniania SASL. Nieobsługiwane, gdy AuthenticationMode ma wartość Gssapi . Aby uzyskać więcej informacji, zobacz Połączenia . |
protokół | (Opcjonalnie) Protokół zabezpieczeń używany podczas komunikacji z brokerami. Obsługiwane wartości to plaintext (wartość domyślna), ssl , sasl_plaintext , sasl_ssl . |
sslCaLocation | (Opcjonalnie) Ścieżka do pliku certyfikatu urzędu certyfikacji na potrzeby weryfikowania certyfikatu brokera. |
sslCertificateLocation | (Opcjonalnie) Ścieżka do certyfikatu klienta. |
sslKeyLocation | (Opcjonalnie) Ścieżka do klucza prywatnego klienta (PEM) używanego do uwierzytelniania. |
sslKeyPassword | (Opcjonalnie) Hasło do certyfikatu klienta. |
Użycie
Zarówno klucze, jak i typy wartości są obsługiwane z wbudowaną serializacji Avro i Protobuf .
Przesunięcie, partycja i sygnatura czasowa zdarzenia są generowane w czasie wykonywania. W funkcji można ustawić tylko wartości i nagłówki. Temat jest ustawiony w function.json.
Upewnij się, że masz dostęp do tematu platformy Kafka, do którego próbujesz napisać. Powiązanie należy skonfigurować przy użyciu poświadczeń dostępu i połączenia do tematu platformy Kafka.
W planie Premium należy włączyć monitorowanie skalowania w czasie wykonywania dla danych wyjściowych platformy Kafka, aby móc skalować w poziomie do wielu wystąpień. Aby dowiedzieć się więcej, zobacz Włączanie skalowania środowiska uruchomieniowego.
Aby uzyskać pełny zestaw obsługiwanych ustawień host.json wyzwalacza platformy Kafka, zobacz host.json ustawienia.
Połączenia
Wszystkie informacje o połączeniu wymagane przez wyzwalacze i powiązania powinny być przechowywane w ustawieniach aplikacji, a nie w definicjach powiązań w kodzie. Dotyczy to poświadczeń, które nigdy nie powinny być przechowywane w kodzie.
Ważne
Ustawienia poświadczeń muszą odwoływać się do ustawienia aplikacji. Nie należy zapisywać poświadczeń w kodzie ani plikach konfiguracji. W przypadku uruchamiania lokalnego użyj pliku local.settings.json dla poświadczeń i nie publikuj pliku local.settings.json.
Podczas nawiązywania połączenia z zarządzanym klastrem Platformy Kafka udostępnianym przez platformę Confluent na platformie Azure upewnij się, że następujące poświadczenia uwierzytelniania dla środowiska platformy Confluent Cloud zostały ustawione w wyzwalaczu lub powiązaniu:
Ustawienie | Zalecana wartość | opis |
---|---|---|
Lista brokerów | BootstrapServer |
Ustawienie aplikacji o nazwie BootstrapServer zawiera wartość serwera bootstrap znalezionego na stronie ustawień chmury Confluent. Wartość przypomina xyz-xyzxzy.westeurope.azure.confluent.cloud:9092 . |
Nazwa użytkownika | ConfluentCloudUsername |
Ustawienie aplikacji o nazwie ConfluentCloudUsername zawiera klucz dostępu interfejsu API z witryny internetowej Confluent Cloud. |
Hasło | ConfluentCloudPassword |
Ustawienie aplikacji o nazwie ConfluentCloudPassword zawiera wpis tajny interfejsu API uzyskany z witryny internetowej platformy Confluent Cloud. |
Wartości ciągu używane dla tych ustawień muszą być obecne jako ustawienia aplikacji na platformie Azure lub w Values
kolekcji w pliku local.settings.json podczas programowania lokalnego.
Należy również ustawić Protocol
definicje powiązań , AuthenticationMode
i SslCaLocation
.