Trigger Apache Kafka per Funzioni di Azure
È possibile usare il trigger Apache Kafka in Funzioni di Azure per eseguire il codice della funzione in risposta ai messaggi negli argomenti kafka. È anche possibile usare un'associazione di output Kafka per scrivere dalla funzione in un argomento. Per informazioni dettagliate sull'installazione e sulla configurazione, vedere Binding Apache Kafka per Funzioni di Azure panoramica.
Importante
Le associazioni Kafka sono disponibili solo per le funzioni nel piano Elastic Premium e dedicato (servizio app). Sono supportate solo nella versione 3.x e successiva del runtime di Funzioni.
Esempio
L'utilizzo del trigger dipende dalla modalità C# usata nell'app per le funzioni, che può essere una delle modalità seguenti:
Una libreria di classi di processo di lavoro isolata compilata C# viene eseguita in un processo isolato dal runtime.
Gli attributi usati dipendono dal provider di eventi specifico.
L'esempio seguente mostra una funzione C# che legge e registra il messaggio Kafka come evento Kafka:
[Function("KafkaTrigger")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
var logger = context.GetLogger("KafkaFunction");
logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(eventData)["Value"]}");
}
Per ricevere eventi in un batch, usare una matrice di stringhe come input, come illustrato nell'esempio seguente:
[Function("KafkaTriggerMany")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default",
IsBatched = true)] string[] events, FunctionContext context)
{
foreach (var kevent in events)
{
var logger = context.GetLogger("KafkaFunction");
logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(kevent)["Value"]}");
}
La funzione seguente registra il messaggio e le intestazioni per l'evento Kafka:
[Function("KafkaTriggerWithHeaders")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
var eventJsonObject = JObject.Parse(eventData);
var logger = context.GetLogger("KafkaFunction");
logger.LogInformation($"C# Kafka trigger function processed a message: {eventJsonObject["Value"]}");
var headersJArr = eventJsonObject["Headers"] as JArray;
logger.LogInformation("Headers for this event: ");
foreach (JObject header in headersJArr)
{
logger.LogInformation($"{header["Key"]} {System.Text.Encoding.UTF8.GetString((byte[])header["Value"])}");
}
}
Per un set completo di esempi .NET funzionanti, vedere il repository di estensioni Kafka.
Nota
Per un set equivalente di esempi typeScript, vedere il repository di estensioni Kafka
Le proprietà specifiche del file function.json dipendono dal provider di eventi, che in questi esempi sono Confluent o Hub eventi di Azure. Negli esempi seguenti viene illustrato un trigger Kafka per una funzione che legge e registra un messaggio Kafka.
L'function.json seguente definisce il trigger per il provider specifico:
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "event",
"direction": "in",
"topic": "topic",
"brokerList": "%BrokerList%",
"username": "%ConfluentCloudUserName%",
"password": "%ConfluentCloudPassword%",
"protocol": "saslSsl",
"authenticationMode": "plain",
"consumerGroup" : "$Default",
"dataType": "string"
}
]
}
Il codice seguente viene quindi eseguito quando viene attivata la funzione:
module.exports = async function (context, event) {
// context.log.info(event)
context.log.info(`JavaScript Kafka trigger function called for message ${event.Value}`);
};
Per ricevere eventi in un batch, impostare il cardinality
valore su many
nel file function.json, come illustrato negli esempi seguenti:
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "event",
"direction": "in",
"protocol" : "SASLSSL",
"password" : "%ConfluentCloudPassword%",
"dataType" : "string",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"cardinality" : "MANY",
"consumerGroup" : "$Default",
"username" : "%ConfluentCloudUserName%",
"brokerList" : "%BrokerList%"
}
]
}
Il codice seguente analizza quindi la matrice di eventi e registra i dati dell'evento:
module.exports = async function (context, events) {
function print(event) {
var eventJson = JSON.parse(event)
context.log.info(`JavaScript Kafka trigger function called for message ${eventJson.Value}`);
}
events.map(print);
};
Il codice seguente registra anche i dati dell'intestazione:
module.exports = async function (context, event) {
function print(kevent) {
var keventJson = JSON.parse(kevent)
context.log.info(`JavaScript Kafka trigger function called for message ${keventJson.Value}`);
context.log.info(`Headers for this message:`)
let headers = keventJson.Headers;
headers.forEach(element => {
context.log.info(`Key: ${element.Key} Value:${Buffer.from(element.Value, 'base64')}`)
});
}
event.map(print);
};
È possibile definire uno schema Avro generico per l'evento passato al trigger. L'function.json seguente definisce il trigger per il provider specifico con uno schema Avro generico:
{
"bindings" : [ {
"type" : "kafkaTrigger",
"direction" : "in",
"name" : "kafkaAvroGenericSingle",
"protocol" : "SASLSSL",
"password" : "ConfluentCloudPassword",
"topic" : "topic",
"avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
"authenticationMode" : "PLAIN",
"consumerGroup" : "$Default",
"username" : "ConfluentCloudUsername",
"brokerList" : "%BrokerList%"
} ]
}
Il codice seguente viene quindi eseguito quando viene attivata la funzione:
module.exports = async function (context, event) {
context.log.info(`JavaScript Kafka trigger function called for message ${JSON.stringify(event)}`);
};
Per un set completo di esempi JavaScript funzionanti, vedere il repository di estensioni Kafka.
Le proprietà specifiche del file function.json dipendono dal provider di eventi, che in questi esempi sono Confluent o Hub eventi di Azure. Negli esempi seguenti viene illustrato un trigger Kafka per una funzione che legge e registra un messaggio Kafka.
L'function.json seguente definisce il trigger per il provider specifico:
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "kafkaEvent",
"direction": "in",
"protocol" : "SASLSSL",
"password" : "%ConfluentCloudPassword%",
"dataType" : "string",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"consumerGroup" : "$Default",
"username" : "%ConfluentCloudUserName%",
"brokerList" : "%BrokerList%",
"sslCaLocation": "confluent_cloud_cacert.pem"
}
]
}
Il codice seguente viene quindi eseguito quando viene attivata la funzione:
using namespace System.Net
param($kafkaEvent, $TriggerMetadata)
Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"
Per ricevere eventi in un batch, impostare il cardinality
valore su many
nel file function.json, come illustrato negli esempi seguenti:
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "kafkaEvent",
"direction": "in",
"protocol" : "SASLSSL",
"password" : "%ConfluentCloudPassword%",
"dataType" : "string",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"cardinality" : "MANY",
"consumerGroup" : "$Default",
"username" : "%ConfluentCloudUserName%",
"brokerList" : "%BrokerList%",
"sslCaLocation": "confluent_cloud_cacert.pem"
}
]
}
Il codice seguente analizza quindi la matrice di eventi e registra i dati dell'evento:
using namespace System.Net
param($kafkaEvents, $TriggerMetadata)
$kafkaEvents
foreach ($kafkaEvent in $kafkaEvents) {
$event = $kafkaEvent | ConvertFrom-Json -AsHashtable
Write-Output "Powershell Kafka trigger function called for message $event.Value"
}
Il codice seguente registra anche i dati dell'intestazione:
using namespace System.Net
param($kafkaEvents, $TriggerMetadata)
foreach ($kafkaEvent in $kafkaEvents) {
$kevent = $kafkaEvent | ConvertFrom-Json -AsHashtable
Write-Output "Powershell Kafka trigger function called for message $kevent.Value"
Write-Output "Headers for this message:"
foreach ($header in $kevent.Headers) {
$DecodedValue = [System.Text.Encoding]::Unicode.GetString([System.Convert]::FromBase64String($header.Value))
$Key = $header.Key
Write-Output "Key: $Key Value: $DecodedValue"
}
}
È possibile definire uno schema Avro generico per l'evento passato al trigger. L'function.json seguente definisce il trigger per il provider specifico con uno schema Avro generico:
{
"bindings" : [ {
"type" : "kafkaTrigger",
"direction" : "in",
"name" : "kafkaEvent",
"protocol" : "SASLSSL",
"password" : "ConfluentCloudPassword",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
"consumerGroup" : "$Default",
"username" : "ConfluentCloudUsername",
"brokerList" : "%BrokerList%"
} ]
}
Il codice seguente viene quindi eseguito quando viene attivata la funzione:
using namespace System.Net
param($kafkaEvent, $TriggerMetadata)
Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"
Per un set completo di esempi di PowerShell funzionanti, vedere il repository di estensioni Kafka.
Le proprietà specifiche del file function.json dipendono dal provider di eventi, che in questi esempi sono Confluent o Hub eventi di Azure. Negli esempi seguenti viene illustrato un trigger Kafka per una funzione che legge e registra un messaggio Kafka.
L'function.json seguente definisce il trigger per il provider specifico:
{
"scriptFile": "main.py",
"bindings": [
{
"type": "kafkaTrigger",
"name": "kevent",
"topic": "topic",
"brokerList": "%BrokerList%",
"username": "%ConfluentCloudUserName%",
"password": "%ConfluentCloudPassword%",
"consumerGroup" : "functions",
"protocol": "saslSsl",
"authenticationMode": "plain"
}
]
}
Il codice seguente viene quindi eseguito quando viene attivata la funzione:
import logging
from azure.functions import KafkaEvent
def main(kevent : KafkaEvent):
logging.info(kevent.get_body().decode('utf-8'))
logging.info(kevent.metadata)
Per ricevere eventi in un batch, impostare il cardinality
valore su many
nel file function.json, come illustrato negli esempi seguenti:
{
"scriptFile": "main.py",
"bindings": [
{
"type" : "kafkaTrigger",
"direction": "in",
"name" : "kevents",
"protocol" : "SASLSSL",
"password" : "%ConfluentCloudPassword%",
"topic" : "message_python",
"authenticationMode" : "PLAIN",
"cardinality" : "MANY",
"dataType": "string",
"consumerGroup" : "$Default",
"username" : "%ConfluentCloudUserName%",
"BrokerList" : "%BrokerList%"
}
]
}
Il codice seguente analizza quindi la matrice di eventi e registra i dati dell'evento:
import logging
import typing
from azure.functions import KafkaEvent
def main(kevents : typing.List[KafkaEvent]):
for event in kevents:
logging.info(event.get_body())
Il codice seguente registra anche i dati dell'intestazione:
import logging
import typing
from azure.functions import KafkaEvent
import json
import base64
def main(kevents : typing.List[KafkaEvent]):
for event in kevents:
event_dec = event.get_body().decode('utf-8')
event_json = json.loads(event_dec)
logging.info("Python Kafka trigger function called for message " + event_json["Value"])
headers = event_json["Headers"]
for header in headers:
logging.info("Key: "+ header['Key'] + " Value: "+ str(base64.b64decode(header['Value']).decode('ascii')))
È possibile definire uno schema Avro generico per l'evento passato al trigger. L'function.json seguente definisce il trigger per il provider specifico con uno schema Avro generico:
{
"scriptFile": "main.py",
"bindings" : [ {
"type" : "kafkaTrigger",
"direction" : "in",
"name" : "kafkaTriggerAvroGeneric",
"protocol" : "SASLSSL",
"password" : "ConfluentCloudPassword",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
"consumerGroup" : "$Default",
"username" : "ConfluentCloudUsername",
"brokerList" : "%BrokerList%"
} ]
}
Il codice seguente viene quindi eseguito quando viene attivata la funzione:
import logging
from azure.functions import KafkaEvent
def main(kafkaTriggerAvroGeneric : KafkaEvent):
logging.info(kafkaTriggerAvroGeneric.get_body().decode('utf-8'))
logging.info(kafkaTriggerAvroGeneric.metadata)
Per un set completo di esempi di Python funzionanti, vedere il repository di estensioni Kafka.
Le annotazioni usate per configurare il trigger dipendono dal provider di eventi specifico.
L'esempio seguente mostra una funzione Java che legge e registra il contenuto dell'evento Kafka:
@FunctionName("KafkaTrigger")
public void runSingle(
@KafkaTrigger(
name = "KafkaTrigger",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
dataType = "string"
) String kafkaEventData,
final ExecutionContext context) {
context.getLogger().info(kafkaEventData);
}
Per ricevere eventi in un batch, usare una stringa di input come matrice, come illustrato nell'esempio seguente:
@FunctionName("KafkaTriggerMany")
public void runMany(
@KafkaTrigger(
name = "kafkaTriggerMany",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
cardinality = Cardinality.MANY,
dataType = "string"
) String[] kafkaEvents,
final ExecutionContext context) {
for (String kevent: kafkaEvents) {
context.getLogger().info(kevent);
}
}
La funzione seguente registra il messaggio e le intestazioni per l'evento Kafka:
@FunctionName("KafkaTriggerManyWithHeaders")
public void runSingle(
@KafkaTrigger(
name = "KafkaTrigger",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
dataType = "string",
cardinality = Cardinality.MANY
) List<String> kafkaEvents,
final ExecutionContext context) {
Gson gson = new Gson();
for (String keventstr: kafkaEvents) {
KafkaEntity kevent = gson.fromJson(keventstr, KafkaEntity.class);
context.getLogger().info("Java Kafka trigger function called for message: " + kevent.Value);
context.getLogger().info("Headers for the message:");
for (KafkaHeaders header : kevent.Headers) {
String decodedValue = new String(Base64.getDecoder().decode(header.Value));
context.getLogger().info("Key:" + header.Key + " Value:" + decodedValue);
}
}
}
È possibile definire uno schema Avro generico per l'evento passato al trigger. La funzione seguente definisce un trigger per il provider specifico con uno schema Avro generico:
private static final String schema = "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}";
@FunctionName("KafkaAvroGenericTrigger")
public void runOne(
@KafkaTrigger(
name = "kafkaAvroGenericSingle",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "ConfluentCloudUsername",
password = "ConfluentCloudPassword",
avroSchema = schema,
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL) Payment payment,
final ExecutionContext context) {
context.getLogger().info(payment.toString());
}
Per un set completo di esempi Java funzionanti per Confluent, vedere il repository di estensioni Kafka.
Attributi
Le librerie C# del processo di lavoro isolato e in-process usano KafkaTriggerAttribute
per definire il trigger di funzione.
La tabella seguente illustra le proprietà che è possibile impostare usando questo attributo trigger:
Parametro | Descrizione |
---|---|
BrokerList | (Obbligatorio) Elenco di broker Kafka monitorati dal trigger. Per altre informazioni, vedere Connessioni . |
Argomento | (Obbligatorio) Argomento monitorato dal trigger. |
ConsumerGroup | (Facoltativo) Gruppo di consumer Kafka usato dal trigger. |
AvroSchema | (Facoltativo) Schema di un record generico quando si usa il protocollo Avro. |
AuthenticationMode | (Facoltativo) Modalità di autenticazione quando si usa l'autenticazione SASL (Simple Authentication and Security Layer). I valori supportati sono Gssapi , Plain (impostazione predefinita), ScramSha256 , ScramSha512 . |
Nome utente | (Facoltativo) Nome utente per l'autenticazione SASL. Non supportato quando AuthenticationMode è Gssapi . Per altre informazioni, vedere Connessioni . |
Password | (Facoltativo) Password per l'autenticazione SASL. Non supportato quando AuthenticationMode è Gssapi . Per altre informazioni, vedere Connessioni . |
Protocollo | (Facoltativo) Protocollo di sicurezza usato per la comunicazione con i broker. I valori supportati sono plaintext (impostazione predefinita), ssl , , sasl_plaintext sasl_ssl . |
SslCaLocation | (Facoltativo) Percorso del file di certificato DELLA CA per la verifica del certificato del broker. |
SslCertificateLocation | (Facoltativo) Percorso del certificato del client. |
SslKeyLocation | (Facoltativo) Percorso della chiave privata del client (PEM) usato per l'autenticazione. |
SslKeyPassword | (Facoltativo) Password per il certificato del client. |
Annotazioni
L'annotazione KafkaTrigger
consente di creare una funzione che viene eseguita quando viene ricevuto un argomento. Le opzioni supportate includono gli elementi seguenti:
Elemento | Descrizione |
---|---|
name | (Obbligatorio) Nome della variabile che rappresenta il messaggio della coda o dell'argomento nel codice della funzione. |
brokerList | (Obbligatorio) Elenco di broker Kafka monitorati dal trigger. Per altre informazioni, vedere Connessioni . |
topic | (Obbligatorio) Argomento monitorato dal trigger. |
cardinality | (Facoltativo) Indica la cardinalità dell'input del trigger. I valori supportati sono ONE (impostazione predefinita) e MANY . Usare ONE quando l'input è un singolo messaggio e MANY quando l'input è una matrice di messaggi. Quando si usa MANY , è necessario impostare anche un oggetto dataType . |
dataType | Definisce il modo in cui Funzioni gestisce il valore del parametro. Per impostazione predefinita, il valore viene ottenuto come stringa e Functions tenta di deserializzare la stringa in un oggetto Java normale (POJO). Quando string , l'input viene considerato come una stringa. Quando binary , il messaggio viene ricevuto come dati binari e Funzioni tenta di deserializzarlo in un byte di tipo di parametro effettivo[]. |
consumerGroup | (Facoltativo) Gruppo di consumer Kafka usato dal trigger. |
avroSchema | (Facoltativo) Schema di un record generico quando si usa il protocollo Avro. |
authenticationMode | (Facoltativo) Modalità di autenticazione quando si usa l'autenticazione SASL (Simple Authentication and Security Layer). I valori supportati sono Gssapi , Plain (impostazione predefinita), ScramSha256 , ScramSha512 . |
username | (Facoltativo) Nome utente per l'autenticazione SASL. Non supportato quando AuthenticationMode è Gssapi . Per altre informazioni, vedere Connessioni . |
password | (Facoltativo) Password per l'autenticazione SASL. Non supportato quando AuthenticationMode è Gssapi . Per altre informazioni, vedere Connessioni . |
protocol | (Facoltativo) Protocollo di sicurezza usato per la comunicazione con i broker. I valori supportati sono plaintext (impostazione predefinita), ssl , , sasl_plaintext sasl_ssl . |
sslCaLocation | (Facoltativo) Percorso del file di certificato DELLA CA per la verifica del certificato del broker. |
sslCertificateLocation | (Facoltativo) Percorso del certificato del client. |
sslKeyLocation | (Facoltativo) Percorso della chiave privata del client (PEM) usato per l'autenticazione. |
sslKeyPassword | (Facoltativo) Password per il certificato del client. |
Impostazione
Nella tabella seguente sono illustrate le proprietà di configurazione dell'associazione impostate nel file function.json.
Proprietà di function.json | Descrizione |
---|---|
type | (Obbligatorio) Deve essere impostato su kafkaTrigger . |
direction | (Obbligatorio) Deve essere impostato su in . |
name | (Obbligatorio) Nome della variabile che rappresenta i dati negoziati nel codice della funzione. |
brokerList | (Obbligatorio) Elenco di broker Kafka monitorati dal trigger. Per altre informazioni, vedere Connessioni . |
topic | (Obbligatorio) Argomento monitorato dal trigger. |
cardinality | (Facoltativo) Indica la cardinalità dell'input del trigger. I valori supportati sono ONE (impostazione predefinita) e MANY . Usare ONE quando l'input è un singolo messaggio e MANY quando l'input è una matrice di messaggi. Quando si usa MANY , è necessario impostare anche un oggetto dataType . |
dataType | Definisce il modo in cui Funzioni gestisce il valore del parametro. Per impostazione predefinita, il valore viene ottenuto come stringa e Functions tenta di deserializzare la stringa in un oggetto Java normale (POJO). Quando string , l'input viene considerato come una stringa. Quando binary , il messaggio viene ricevuto come dati binari e Funzioni tenta di deserializzarlo in un byte di tipo di parametro effettivo[]. |
consumerGroup | (Facoltativo) Gruppo di consumer Kafka usato dal trigger. |
avroSchema | (Facoltativo) Schema di un record generico quando si usa il protocollo Avro. |
authenticationMode | (Facoltativo) Modalità di autenticazione quando si usa l'autenticazione SASL (Simple Authentication and Security Layer). I valori supportati sono Gssapi , Plain (impostazione predefinita), ScramSha256 , ScramSha512 . |
username | (Facoltativo) Nome utente per l'autenticazione SASL. Non supportato quando AuthenticationMode è Gssapi . Per altre informazioni, vedere Connessioni . |
password | (Facoltativo) Password per l'autenticazione SASL. Non supportato quando AuthenticationMode è Gssapi . Per altre informazioni, vedere Connessioni . |
protocol | (Facoltativo) Protocollo di sicurezza usato per la comunicazione con i broker. I valori supportati sono plaintext (impostazione predefinita), ssl , , sasl_plaintext sasl_ssl . |
sslCaLocation | (Facoltativo) Percorso del file di certificato DELLA CA per la verifica del certificato del broker. |
sslCertificateLocation | (Facoltativo) Percorso del certificato del client. |
sslKeyLocation | (Facoltativo) Percorso della chiave privata del client (PEM) usato per l'autenticazione. |
sslKeyPassword | (Facoltativo) Password per il certificato del client. |
Utilizzo
Gli eventi Kafka sono attualmente supportati come stringhe e matrici di stringhe che sono payload JSON.
I messaggi Kafka vengono passati alla funzione come stringhe e matrici di stringhe che sono payload JSON.
In un piano Premium è necessario abilitare il monitoraggio della scalabilità di runtime per consentire l'aumento del numero di istanze dell'output Kafka. Per altre informazioni, vedere Abilitare il ridimensionamento in fase di esecuzione.
Non è possibile usare la funzionalità Test/Esecuzione della pagina Codice e test nel portale di Azure per usare i trigger Kafka. È invece necessario inviare eventi di test direttamente all'argomento monitorato dal trigger.
Per un set completo di impostazioni di host.json supportate per il trigger Kafka, vedere host.json impostazioni.
Connessioni
Tutte le informazioni di connessione richieste dai trigger e dalle associazioni devono essere mantenute nelle impostazioni dell'applicazione e non nelle definizioni di associazione nel codice. Questo vale per le credenziali, che non devono mai essere archiviate nel codice.
Importante
Le impostazioni delle credenziali devono fare riferimento a un'impostazione dell'applicazione. Non impostare le credenziali hardcoded nei file di codice o di configurazione. Quando si esegue localmente, usare il file local.settings.json per le credenziali e non pubblicare il file local.settings.json.
Quando ci si connette a un cluster Kafka gestito fornito da Confluent in Azure, assicurarsi che nel trigger o nell'associazione siano impostate le credenziali di autenticazione seguenti per l'ambiente Confluent Cloud:
Impostazione | Valore consigliato | Descrizione |
---|---|---|
BrokerList | BootstrapServer |
L'impostazione dell'app denominata BootstrapServer contiene il valore del server bootstrap disponibile nella pagina delle impostazioni di Confluent Cloud. Il valore è simile a xyz-xyzxzy.westeurope.azure.confluent.cloud:9092 . |
Nome utente | ConfluentCloudUsername |
L'impostazione dell'app denominata ConfluentCloudUsername contiene la chiave di accesso api dal sito Web Confluent Cloud. |
Password | ConfluentCloudPassword |
L'impostazione dell'app denominata ConfluentCloudPassword contiene il segreto API ottenuto dal sito Web Confluent Cloud. |
I valori stringa usati per queste impostazioni devono essere presenti come impostazioni dell'applicazione in Azure o nella Values
raccolta nel file local.settings.json durante lo sviluppo locale.
È anche necessario impostare , Protocol
AuthenticationMode
e SslCaLocation
nelle definizioni di associazione.