Azure Functions の Apache Kafka トリガー
Azure Functions で Apache Kafka トリガーを使用して、Kafka トピックのメッセージに応答して関数コードを実行できます。 Kafka 出力バインドを使用して、関数からトピックに書き込むこともできます。 セットアップと構成の詳細については、「Azure Functions における Apache Kafka バインドの概要」を参照してください。
重要
Kafka バインドは、エラスティック Premium プランおよび 専用 (App Service) プランの Functions でのみ使用できます。 これらは、バージョン 3.x 以降のバージョンの Functions ランタイムでのみサポートされます。
例
トリガーの使用方法は、拡張機能パッケージのバージョンと、関数アプリで使用される C# のモダリティによって異なり、次のモードのいずれかになります。
分離ワーカー プロセス クラス ライブラリでコンパイルされた C# 関数は、ランタイムから分離されたプロセスで実行されます。
使用する属性は、個別のイベント プロバイダーによって異なります。
次の例は、Kafka メッセージを Kafka イベントとして読み取り、ログする C# 関数を示しています。
[Function("KafkaTrigger")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
var logger = context.GetLogger("KafkaFunction");
logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(eventData)["Value"]}");
}
バッチでイベントを受信するには、次の例に示すように、文字列配列を入力として使用します。
[Function("KafkaTriggerMany")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default",
IsBatched = true)] string[] events, FunctionContext context)
{
foreach (var kevent in events)
{
var logger = context.GetLogger("KafkaFunction");
logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(kevent)["Value"]}");
}
次の関数により、Kafka イベントのメッセージとヘッダーがログされます。
[Function("KafkaTriggerWithHeaders")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
var eventJsonObject = JObject.Parse(eventData);
var logger = context.GetLogger("KafkaFunction");
logger.LogInformation($"C# Kafka trigger function processed a message: {eventJsonObject["Value"]}");
var headersJArr = eventJsonObject["Headers"] as JArray;
logger.LogInformation("Headers for this event: ");
foreach (JObject header in headersJArr)
{
logger.LogInformation($"{header["Key"]} {System.Text.Encoding.UTF8.GetString((byte[])header["Value"])}");
}
}
動作する .NET の例の完全なセットについては、Kafka 拡張機能リポジトリを参照してください。
注意
同等の TypeScript の例については、Kafka 拡張機能リポジトリを参照してください
function.json ファイルの個別のプロパティは、イベント プロバイダー (以下の例では、Confluent または Azure Event Hubs のいずれかです) によって異なります。 次の例は、Kafka メッセージを読み取ってログする関数の Kafka トリガーを示しています。
次の function.json では、特定のプロバイダーのトリガーを定義しています。
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "event",
"direction": "in",
"topic": "topic",
"brokerList": "%BrokerList%",
"username": "%ConfluentCloudUserName%",
"password": "%ConfluentCloudPassword%",
"protocol": "saslSsl",
"authenticationMode": "plain",
"consumerGroup" : "$Default",
"dataType": "string"
}
]
}
その後、関数がトリガーされたときに、次のコードが実行されます。
module.exports = async function (context, event) {
// context.log.info(event)
context.log.info(`JavaScript Kafka trigger function called for message ${event.Value}`);
};
バッチでイベントを受信するには、次の例に示すように、function.json ファイルで cardinality
の値を many
に設定します。
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "event",
"direction": "in",
"protocol" : "SASLSSL",
"password" : "%ConfluentCloudPassword%",
"dataType" : "string",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"cardinality" : "MANY",
"consumerGroup" : "$Default",
"username" : "%ConfluentCloudUserName%",
"brokerList" : "%BrokerList%"
}
]
}
次のコードにより、イベントの配列が解析され、イベント データがログされます。
module.exports = async function (context, events) {
function print(event) {
var eventJson = JSON.parse(event)
context.log.info(`JavaScript Kafka trigger function called for message ${eventJson.Value}`);
}
events.map(print);
};
また、次のコードにより、ヘッダー データがログされます。
module.exports = async function (context, event) {
function print(kevent) {
var keventJson = JSON.parse(kevent)
context.log.info(`JavaScript Kafka trigger function called for message ${keventJson.Value}`);
context.log.info(`Headers for this message:`)
let headers = keventJson.Headers;
headers.forEach(element => {
context.log.info(`Key: ${element.Key} Value:${Buffer.from(element.Value, 'base64')}`)
});
}
event.map(print);
};
トリガーに渡されるイベントに対して汎用の Avro スキーマ を定義できます。 次の function.json では、汎用の Avro スキーマを使用して、特定のプロバイダーのトリガーを定義しています。
{
"bindings" : [ {
"type" : "kafkaTrigger",
"direction" : "in",
"name" : "kafkaAvroGenericSingle",
"protocol" : "SASLSSL",
"password" : "ConfluentCloudPassword",
"topic" : "topic",
"avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
"authenticationMode" : "PLAIN",
"consumerGroup" : "$Default",
"username" : "ConfluentCloudUsername",
"brokerList" : "%BrokerList%"
} ]
}
その後、関数がトリガーされたときに、次のコードが実行されます。
module.exports = async function (context, event) {
context.log.info(`JavaScript Kafka trigger function called for message ${JSON.stringify(event)}`);
};
動作する JavaScript の例の完全なセットについては、Kafka 拡張機能リポジトリを参照してください。
function.json ファイルの個別のプロパティは、イベント プロバイダー (以下の例では、Confluent または Azure Event Hubs のいずれかです) によって異なります。 次の例は、Kafka メッセージを読み取ってログする関数の Kafka トリガーを示しています。
次の function.json では、特定のプロバイダーのトリガーを定義しています。
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "kafkaEvent",
"direction": "in",
"protocol" : "SASLSSL",
"password" : "%ConfluentCloudPassword%",
"dataType" : "string",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"consumerGroup" : "$Default",
"username" : "%ConfluentCloudUserName%",
"brokerList" : "%BrokerList%",
"sslCaLocation": "confluent_cloud_cacert.pem"
}
]
}
その後、関数がトリガーされたときに、次のコードが実行されます。
using namespace System.Net
param($kafkaEvent, $TriggerMetadata)
Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"
バッチでイベントを受信するには、次の例に示すように、function.json ファイルで cardinality
の値を many
に設定します。
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "kafkaEvent",
"direction": "in",
"protocol" : "SASLSSL",
"password" : "%ConfluentCloudPassword%",
"dataType" : "string",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"cardinality" : "MANY",
"consumerGroup" : "$Default",
"username" : "%ConfluentCloudUserName%",
"brokerList" : "%BrokerList%",
"sslCaLocation": "confluent_cloud_cacert.pem"
}
]
}
次のコードにより、イベントの配列が解析され、イベント データがログされます。
using namespace System.Net
param($kafkaEvents, $TriggerMetadata)
$kafkaEvents
foreach ($kafkaEvent in $kafkaEvents) {
$event = $kafkaEvent | ConvertFrom-Json -AsHashtable
Write-Output "Powershell Kafka trigger function called for message $event.Value"
}
また、次のコードにより、ヘッダー データがログされます。
using namespace System.Net
param($kafkaEvents, $TriggerMetadata)
foreach ($kafkaEvent in $kafkaEvents) {
$kevent = $kafkaEvent | ConvertFrom-Json -AsHashtable
Write-Output "Powershell Kafka trigger function called for message $kevent.Value"
Write-Output "Headers for this message:"
foreach ($header in $kevent.Headers) {
$DecodedValue = [System.Text.Encoding]::Unicode.GetString([System.Convert]::FromBase64String($header.Value))
$Key = $header.Key
Write-Output "Key: $Key Value: $DecodedValue"
}
}
トリガーに渡されるイベントに対して汎用の Avro スキーマ を定義できます。 次の function.json では、汎用の Avro スキーマを使用して、特定のプロバイダーのトリガーを定義しています。
{
"bindings" : [ {
"type" : "kafkaTrigger",
"direction" : "in",
"name" : "kafkaEvent",
"protocol" : "SASLSSL",
"password" : "ConfluentCloudPassword",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
"consumerGroup" : "$Default",
"username" : "ConfluentCloudUsername",
"brokerList" : "%BrokerList%"
} ]
}
その後、関数がトリガーされたときに、次のコードが実行されます。
using namespace System.Net
param($kafkaEvent, $TriggerMetadata)
Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"
動作する PowerShell の例の完全なセットについては、Kafka 拡張機能リポジトリを参照してください。
function.json ファイルの個別のプロパティは、イベント プロバイダー (以下の例では、Confluent または Azure Event Hubs のいずれかです) によって異なります。 次の例は、Kafka メッセージを読み取ってログする関数の Kafka トリガーを示しています。
次の function.json では、特定のプロバイダーのトリガーを定義しています。
{
"scriptFile": "main.py",
"bindings": [
{
"type": "kafkaTrigger",
"name": "kevent",
"topic": "topic",
"brokerList": "%BrokerList%",
"username": "%ConfluentCloudUserName%",
"password": "%ConfluentCloudPassword%",
"consumerGroup" : "functions",
"protocol": "saslSsl",
"authenticationMode": "plain"
}
]
}
その後、関数がトリガーされたときに、次のコードが実行されます。
import logging
from azure.functions import KafkaEvent
def main(kevent : KafkaEvent):
logging.info(kevent.get_body().decode('utf-8'))
logging.info(kevent.metadata)
バッチでイベントを受信するには、次の例に示すように、function.json ファイルで cardinality
の値を many
に設定します。
{
"scriptFile": "main.py",
"bindings": [
{
"type" : "kafkaTrigger",
"direction": "in",
"name" : "kevents",
"protocol" : "SASLSSL",
"password" : "%ConfluentCloudPassword%",
"topic" : "message_python",
"authenticationMode" : "PLAIN",
"cardinality" : "MANY",
"dataType": "string",
"consumerGroup" : "$Default",
"username" : "%ConfluentCloudUserName%",
"BrokerList" : "%BrokerList%"
}
]
}
次のコードにより、イベントの配列が解析され、イベント データがログされます。
import logging
import typing
from azure.functions import KafkaEvent
def main(kevents : typing.List[KafkaEvent]):
for event in kevents:
logging.info(event.get_body())
また、次のコードにより、ヘッダー データがログされます。
import logging
import typing
from azure.functions import KafkaEvent
import json
import base64
def main(kevents : typing.List[KafkaEvent]):
for event in kevents:
event_dec = event.get_body().decode('utf-8')
event_json = json.loads(event_dec)
logging.info("Python Kafka trigger function called for message " + event_json["Value"])
headers = event_json["Headers"]
for header in headers:
logging.info("Key: "+ header['Key'] + " Value: "+ str(base64.b64decode(header['Value']).decode('ascii')))
トリガーに渡されるイベントに対して汎用の Avro スキーマ を定義できます。 次の function.json では、汎用の Avro スキーマを使用して、特定のプロバイダーのトリガーを定義しています。
{
"scriptFile": "main.py",
"bindings" : [ {
"type" : "kafkaTrigger",
"direction" : "in",
"name" : "kafkaTriggerAvroGeneric",
"protocol" : "SASLSSL",
"password" : "ConfluentCloudPassword",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
"consumerGroup" : "$Default",
"username" : "ConfluentCloudUsername",
"brokerList" : "%BrokerList%"
} ]
}
その後、関数がトリガーされたときに、次のコードが実行されます。
import logging
from azure.functions import KafkaEvent
def main(kafkaTriggerAvroGeneric : KafkaEvent):
logging.info(kafkaTriggerAvroGeneric.get_body().decode('utf-8'))
logging.info(kafkaTriggerAvroGeneric.metadata)
動作する Python の例の完全なセットについては、Kafka 拡張機能リポジトリを参照してください。
トリガーを構成するために使用する注釈は、個別のイベント プロバイダーによって異なります。
次の例は、Kafka イベントの内容を読み取り、ログする Java 関数を示しています。
@FunctionName("KafkaTrigger")
public void runSingle(
@KafkaTrigger(
name = "KafkaTrigger",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
dataType = "string"
) String kafkaEventData,
final ExecutionContext context) {
context.getLogger().info(kafkaEventData);
}
バッチでイベントを受信するには、次の例に示すように、入力文字列を配列として使用します。
@FunctionName("KafkaTriggerMany")
public void runMany(
@KafkaTrigger(
name = "kafkaTriggerMany",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
cardinality = Cardinality.MANY,
dataType = "string"
) String[] kafkaEvents,
final ExecutionContext context) {
for (String kevent: kafkaEvents) {
context.getLogger().info(kevent);
}
}
次の関数により、Kafka イベントのメッセージとヘッダーがログされます。
@FunctionName("KafkaTriggerManyWithHeaders")
public void runSingle(
@KafkaTrigger(
name = "KafkaTrigger",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
dataType = "string",
cardinality = Cardinality.MANY
) List<String> kafkaEvents,
final ExecutionContext context) {
Gson gson = new Gson();
for (String keventstr: kafkaEvents) {
KafkaEntity kevent = gson.fromJson(keventstr, KafkaEntity.class);
context.getLogger().info("Java Kafka trigger function called for message: " + kevent.Value);
context.getLogger().info("Headers for the message:");
for (KafkaHeaders header : kevent.Headers) {
String decodedValue = new String(Base64.getDecoder().decode(header.Value));
context.getLogger().info("Key:" + header.Key + " Value:" + decodedValue);
}
}
}
トリガーに渡されるイベントに対して汎用の Avro スキーマ を定義できます。 次の関数では、汎用の Avro スキーマを使用して、特定のプロバイダーのトリガーを定義しています。
private static final String schema = "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}";
@FunctionName("KafkaAvroGenericTrigger")
public void runOne(
@KafkaTrigger(
name = "kafkaAvroGenericSingle",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "ConfluentCloudUsername",
password = "ConfluentCloudPassword",
avroSchema = schema,
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL) Payment payment,
final ExecutionContext context) {
context.getLogger().info(payment.toString());
}
Confluent で動作する Java の例の完全なセットについては、Kafka 拡張機能リポジトリを参照してください。
属性
インプロセスと分離ワーカー プロセスの C# ライブラリはどちらも、KafkaTriggerAttribute
を使用して関数トリガーを定義します。
次の表では、このトリガー属性を使用して設定できるプロパティについて説明します。
パラメーター | 説明 |
---|---|
BrokerList | (必須) トリガーによって監視される Kafka ブローカーの一覧。 詳細については、「接続」を参照してください。 |
トピック | (必須) トリガーによって監視されるトピック。 |
ConsumerGroup | (省略可能) トリガーで使用される Kafka コンシューマー グループ。 |
AvroSchema | (省略可能) Avro プロトコルを使用する場合の汎用レコードのスキーマ。 |
AuthenticationMode | (省略可能) 簡易認証およびセキュリティ層 (SASL) 認証を使用する場合の認証モード。 サポートされる値は、Gssapi 、Plain (既定値)、ScramSha256 、ScramSha512 です。 |
ユーザー名 | (省略可能) SASL 認証のユーザー名。 AuthenticationMode が Gssapi の場合はサポートされません。 詳細については、「接続」を参照してください。 |
パスワード | (省略可能) SASL 認証のパスワード。 AuthenticationMode が Gssapi の場合はサポートされません。 詳細については、「接続」を参照してください。 |
プロトコル | (省略可能) ブローカーと通信するときに使用されるセキュリティ プロトコル。 サポートされる値は、plaintext (既定値)、ssl 、sasl_plaintext 、sasl_ssl です。 |
SslCaLocation | (省略可能) ブローカーの証明書を検証するための CA 証明書ファイルへのパス。 |
SslCertificateLocation | (省略可能) クライアントの証明書へのパス。 |
SslKeyLocation | (省略可能) 認証に使用されるクライアントの秘密キー (PEM) へのパス。 |
SslKeyPassword | (省略可能) クライアントの証明書のパスワード。 |
注釈
KafkaTrigger
注釈を使用すると、トピックが受信されたときに実行する関数を作成できます。 サポートされるオプションには、次の要素が含まれます。
要素 | 説明 |
---|---|
name | (必須) 関数コード内のキューまたはトピック メッセージを表す変数の名前。 |
brokerList | (必須) トリガーによって監視される Kafka ブローカーの一覧。 詳細については、「接続」を参照してください。 |
topic | (必須) トリガーによって監視されるトピック。 |
cardinality | (省略可能) トリガー入力のカーディナリティを示します。 サポートされる値は、ONE (既定値) とMANY です。 入力が 1 つのメッセージである場合は ONE 、入力がメッセージの配列である場合は MANY を使用します。 MANY を使用する場合は、dataType も設定する必要があります。 |
dataType | Functions によりパラメーター値が処理される方法を定義します。 既定では、値は文字列として取得され、Functions により文字列を実際の単純な従来の Java オブジェクト (POJO) への逆シリアル化が試みられます。 string の場合、入力は単なる文字列として扱われます。 binary の場合、メッセージはバイナリ データとして受信され、Functions により実際のパラメーター型 byte[] への逆シリアル化が試みられます。 |
consumerGroup | (省略可能) トリガーで使用される Kafka コンシューマー グループ。 |
avroSchema | (省略可能) Avro プロトコルを使用する場合の汎用レコードのスキーマ。 |
authenticationMode | (省略可能) 簡易認証およびセキュリティ層 (SASL) 認証を使用する場合の認証モード。 サポートされる値は、Gssapi 、Plain (既定値)、ScramSha256 、ScramSha512 です。 |
username | (省略可能) SASL 認証のユーザー名。 AuthenticationMode が Gssapi の場合はサポートされません。 詳細については、「接続」を参照してください。 |
password | (省略可能) SASL 認証のパスワード。 AuthenticationMode が Gssapi の場合はサポートされません。 詳細については、「接続」を参照してください。 |
protocol | (省略可能) ブローカーと通信するときに使用されるセキュリティ プロトコル。 サポートされる値は、plaintext (既定値)、ssl 、sasl_plaintext 、sasl_ssl です。 |
sslCaLocation | (省略可能) ブローカーの証明書を検証するための CA 証明書ファイルへのパス。 |
sslCertificateLocation | (省略可能) クライアントの証明書へのパス。 |
sslKeyLocation | (省略可能) 認証に使用されるクライアントの秘密キー (PEM) へのパス。 |
sslKeyPassword | (省略可能) クライアントの証明書のパスワード。 |
構成
次の表は、function.json ファイルで設定したバインド構成のプロパティを説明しています。
function.json のプロパティ | 説明 |
---|---|
type | (必須) kafkaTrigger に設定する必要があります。 |
direction | (必須) in に設定する必要があります。 |
name | (必須) 関数コード内のブローカー データを表す変数の名前。 |
brokerList | (必須) トリガーによって監視される Kafka ブローカーの一覧。 詳細については、「接続」を参照してください。 |
topic | (必須) トリガーによって監視されるトピック。 |
cardinality | (省略可能) トリガー入力のカーディナリティを示します。 サポートされる値は、ONE (既定値) とMANY です。 入力が 1 つのメッセージである場合は ONE 、入力がメッセージの配列である場合は MANY を使用します。 MANY を使用する場合は、dataType も設定する必要があります。 |
dataType | Functions によりパラメーター値が処理される方法を定義します。 既定では、値は文字列として取得され、Functions により文字列を実際の単純な従来の Java オブジェクト (POJO) への逆シリアル化が試みられます。 string の場合、入力は単なる文字列として扱われます。 binary の場合、メッセージはバイナリ データとして受信され、Functions により実際のパラメーター型 byte[] への逆シリアル化が試みられます。 |
consumerGroup | (省略可能) トリガーで使用される Kafka コンシューマー グループ。 |
avroSchema | (省略可能) Avro プロトコルを使用する場合の汎用レコードのスキーマ。 |
authenticationMode | (省略可能) 簡易認証およびセキュリティ層 (SASL) 認証を使用する場合の認証モード。 サポートされる値は、Gssapi 、Plain (既定値)、ScramSha256 、ScramSha512 です。 |
username | (省略可能) SASL 認証のユーザー名。 AuthenticationMode が Gssapi の場合はサポートされません。 詳細については、「接続」を参照してください。 |
password | (省略可能) SASL 認証のパスワード。 AuthenticationMode が Gssapi の場合はサポートされません。 詳細については、「接続」を参照してください。 |
protocol | (省略可能) ブローカーと通信するときに使用されるセキュリティ プロトコル。 サポートされる値は、plaintext (既定値)、ssl 、sasl_plaintext 、sasl_ssl です。 |
sslCaLocation | (省略可能) ブローカーの証明書を検証するための CA 証明書ファイルへのパス。 |
sslCertificateLocation | (省略可能) クライアントの証明書へのパス。 |
sslKeyLocation | (省略可能) 認証に使用されるクライアントの秘密キー (PEM) へのパス。 |
sslKeyPassword | (省略可能) クライアントの証明書のパスワード。 |
使用方法
Kafka イベントは現在、JSON ペイロードである文字列および文字列配列としてサポートされています。
Kafka メッセージは、JSON ペイロードである文字列および文字列配列として関数に渡されます。
Premium プランでは、Kafka 出力のランタイム スケール監視を有効にして、複数のインスタンスにスケール アウトできるようにする必要があります。 詳細については、「ランタイム スケールを有効にする」を参照してください。
Azure Portal の [コードとテスト] ページの [テストと実行] 機能を使って Kafka トリガーを操作することはできません。 代わりに、トリガーによって監視されているトピックにテスト イベントを直接送信する必要があります。
Kafka トリガーでサポートされている host.json 設定の完全なセットについては、「host.json 設定」を参照してください。
接続
トリガーとバインドに必要なすべての接続情報は、コード内のバインド定義ではなく、アプリケーション設定に保持する必要があります。 これは、コードに格納してはならない資格情報にも言えることです。
重要
資格情報の設定はアプリケーション設定を参照する必要があります。 コードや構成のファイル内に資格情報をハードコーディングしないでください。 ローカルで実行する場合は、資格情報に local.settings.json ファイルを使用します。local.settings.json ファイルは公開しないでください。
Azure の Confluent によって提供されるマネージド Kafka クラスターに接続する場合は、Confluent Cloud 環境の次の認証資格情報がトリガーまたはバインドに設定されていることを確認します。
設定 | 推奨値 | 説明 |
---|---|---|
BrokerList | BootstrapServer |
BootstrapServer という名前のアプリ設定には、Confluent Cloud の設定ページで検出されたブートストラップ サーバーの値が含まれています。 値は xyz-xyzxzy.westeurope.azure.confluent.cloud:9092 のようになります。 |
ユーザー名 | ConfluentCloudUsername |
ConfluentCloudUsername という名前のアプリ設定には、Confluent Cloud Web サイトからの API アクセス キーが含まれています。 |
パスワード | ConfluentCloudPassword |
ConfluentCloudPassword という名前のアプリ設定には、Confluent Cloud Web サイトから取得した API シークレットが含まれています。 |
これらの設定に使用する文字列値は、Azure のアプリケーション設定として、またはローカル開発中に local.settings.json ファイル内の Values
コレクションに存在する必要があります。
また、バインド定義で Protocol
、AuthenticationMode
、SslCaLocation
を設定する必要があります。