다음을 통해 공유


Azure Functions에 대한 Apache Kafka 출력 바인딩

출력 바인딩을 사용하면 Azure Functions 앱이 Kafka 토픽에 메시지를 쓸 수 있습니다.

Important

Kafka 바인딩은 탄력적 프리미엄 플랜전용(App Service) 플랜의 Functions에만 사용할 수 있습니다. Functions 런타임 버전 3.x 이상에서만 지원됩니다.

예시

바인딩 사용은 함수 앱에서 사용되는 C# 양식에 따라 다르며, 양식은 다음 중 하나입니다.

격리된 작업자 프로세스 클래스 라이브러리 컴파일된 C# 함수는 런타임에서 격리된 프로세스에서 실행됩니다.

사용하는 특성은 이벤트 공급자에 따라 달라집니다.

다음 예제에서는 HTTP 응답 및 Kafka 출력으로 구성되는 사용자 지정 반환 형식 MultipleOutputType을 사용합니다.

[Function("KafkaOutput")]

public static MultipleOutputType Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");

    string message = req.FunctionContext
                        .BindingContext
                        .BindingData["message"]
                        .ToString();

    var response = req.CreateResponse(HttpStatusCode.OK);
    return new MultipleOutputType()
    {
        Kevent = message,
        HttpResponse = response
    };
}

MultipleOutputType 클래스에서 Kevent는 Kafka 바인딩에 대한 출력 바인딩 변수입니다.

public class MultipleOutputType
{
    [KafkaOutput("BrokerList",
                "topic",
                Username = "ConfluentCloudUserName",
                Password = "ConfluentCloudPassword",
        Protocol = BrokerProtocol.SaslSsl,
        AuthenticationMode = BrokerAuthenticationMode.Plain
    )]        
    public string Kevent { get; set; }

    public HttpResponseData HttpResponse { get; set; }
}

이벤트 일괄 처리를 보내려면 다음 예제와 같이 문자열 배열을 출력 형식으로 전달합니다.

[Function("KafkaOutputMany")]

public static MultipleOutputTypeForBatch Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");
    var response = req.CreateResponse(HttpStatusCode.OK);

    string[] messages = new string[2];
    messages[0] = "one";
    messages[1] = "two";

    return new MultipleOutputTypeForBatch()
    {
        Kevents = messages,
        HttpResponse = response
    };
}

문자열 배열은 클래스의 Kevents 속성으로 정의되고, 여기에서 출력 바인딩이 정의됩니다.

public class MultipleOutputTypeForBatch
{
    [KafkaOutput("BrokerList",
                 "topic",
                 Username = "ConfluentCloudUserName",
                 Password = "ConfluentCloudPassword",
        Protocol = BrokerProtocol.SaslSsl,
        AuthenticationMode = BrokerAuthenticationMode.Plain
    )]        
    public string[] Kevents { get; set; }

    public HttpResponseData HttpResponse { get; set; }
}

다음 함수는 Kafka 출력 데이터에 헤더를 추가합니다.

[Function("KafkaOutputWithHeaders")]

public static MultipleOutputType Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");

    string message = req.FunctionContext
                        .BindingContext
                        .BindingData["message"]
                        .ToString();
    string kevent = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"dotnet-isolated\" }] }";
    var response = req.CreateResponse(HttpStatusCode.OK);
    return new MultipleOutputType()
    {
        Kevent = kevent,
        HttpResponse = response
    };
}

작동하는 .NET 전체 예제는 Kafka 확장 리포지토리를 참조하세요.

참고 항목

상응하는 TypeScript 예제 세트는 Kafka 확장 리포지토리를 참조하세요.

function.json 파일의 특정 속성은 이벤트 공급자에 따라 달라지며, 다음 예제에서는 Confluent 또는 Azure Event Hubs입니다. 다음 예제에서는 HTTP 요청에 의해 트리거되고 요청의 데이터를 Kafka 토픽으로 보내는 함수의 Kafka 출력 바인딩을 보여줍니다.

아래 예제의 다음 function.json은 특정 공급자의 트리거를 정의합니다.

{
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "name": "outputKafkaMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username": "ConfluentCloudUsername",
      "password": "ConfluentCloudPassword",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN",
      "direction": "out"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "res"
    }
  ]
}

다음 코드는 토픽에 메시지를 보냅니다.

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');

    const message = (req.query.message);
    context.bindings.outputKafkaMessage = message;
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: 'Ok'
    };
}

다음 코드는 동일한 토픽에 여러 메시지를 배열로 보냅니다.

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');
    
    context.bindings.outputKafkaMessages = ["one", "two"];
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: responseMessage
    };
}

다음 예제에서는 헤더가 있는 이벤트 메시지를 동일한 Kafka 토픽으로 보내는 방법을 보여줍니다.

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');

    const message = (req.query.message || (req.body && req.body.message));
    const responseMessage = message
        ? "Message received: " + message + ". The message transfered to the kafka broker."
        : "This HTTP triggered function executed successfully. Pass a message in the query string or in the request body for a personalized response.";
    context.bindings.outputKafkaMessage = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"javascript\" }] }"
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: responseMessage
    };
}

작동하는 JavaScript 전체 예제는 Kafka 확장 리포지토리를 참조하세요.

function.json 파일의 특정 속성은 이벤트 공급자에 따라 달라지며, 다음 예제에서는 Confluent 또는 Azure Event Hubs입니다. 다음 예제에서는 HTTP 요청에 의해 트리거되고 요청의 데이터를 Kafka 토픽으로 보내는 함수의 Kafka 출력 바인딩을 보여줍니다.

아래 예제의 다음 function.json은 특정 공급자의 트리거를 정의합니다.

{
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "Request",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "name": "outputMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username" : "%ConfluentCloudUserName%",
      "password" : "%ConfluentCloudPassword%",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN",
      "direction": "out"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "Response"
    }
  ]
}

다음 코드는 토픽에 메시지를 보냅니다.

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

# Interact with query parameters or the body of the request.
$message = $Request.Query.Message

$message

Push-OutputBinding -Name outputMessage -Value ($message)

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
})

다음 코드는 동일한 토픽에 여러 메시지를 배열로 보냅니다.

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

$message = @("one", "two")
Push-OutputBinding -Name outputMessage -Value ($message)

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
})

다음 예제에서는 헤더가 있는 이벤트 메시지를 동일한 Kafka 토픽으로 보내는 방법을 보여줍니다.

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
if (-not $message) {
    $message = $Request.Body.Message
}

$kevent = @{
    Offset = 364
    Partition = 0
    Topic = "kafkaeventhubtest1"
    Timestamp = "2022-04-09T03:20:06.591Z"
    Value = $message
    Headers= @(@{
        Key= "test"
        Value= "powershell"
    }
    )
}

Push-OutputBinding -Name Message -Value $kevent

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
    Body = 'ok'
})

작동하는 PowerShell 예제의 전체 집합은 Kafka 확장 리포지토리를 참조하세요.

function.json 파일의 특정 속성은 이벤트 공급자에 따라 달라지며, 다음 예제에서는 Confluent 또는 Azure Event Hubs입니다. 다음 예제에서는 HTTP 요청에 의해 트리거되고 요청의 데이터를 Kafka 토픽으로 보내는 함수의 Kafka 출력 바인딩을 보여줍니다.

아래 예제의 다음 function.json은 특정 공급자의 트리거를 정의합니다.

{
  "scriptFile": "main.py",
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "direction": "out",
      "name": "outputMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username": "%ConfluentCloudUserName%",
      "password": "%ConfluentCloudPassword%",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "$return"
    }
  ]
}

다음 코드는 토픽에 메시지를 보냅니다.

import logging

import azure.functions as func


def main(req: func.HttpRequest, outputMessage: func.Out[str]) -> func.HttpResponse:
    input_msg = req.params.get('message')
    outputMessage.set(input_msg)
    return 'OK'

다음 코드는 동일한 토픽에 여러 메시지를 배열로 보냅니다.

import logging
import typing
from azure.functions import Out, HttpRequest, HttpResponse
import json

def main(req: HttpRequest, outputMessage: Out[str] ) -> HttpResponse:
    outputMessage.set(['one', 'two'])
    return 'OK'

다음 예제에서는 헤더가 있는 이벤트 메시지를 동일한 Kafka 토픽으로 보내는 방법을 보여줍니다.

import logging

import azure.functions as func
import json

def main(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
    message = req.params.get('message')
    kevent =  { "Offset":364,"Partition":0,"Topic":"kafkaeventhubtest1","Timestamp":"2022-04-09T03:20:06.591Z", "Value": message, "Headers": [{ "Key": "test", "Value": "python" }] }
    out.set(json.dumps(kevent))
    return 'OK'

작동하는 Python 전체 예제는 Kafka 확장 리포지토리를 참조하세요.

출력 바인딩을 구성하는 데 사용하는 주석은 이벤트 공급자에 따라 달라집니다.

다음 함수는 Kafka 토픽에 메시지를 보냅니다.

@FunctionName("KafkaOutput")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<String> output,
        final ExecutionContext context) {
    context.getLogger().info("Java HTTP trigger processed a request.");

    // Parse query parameter
    String query = request.getQueryParameters().get("message");
    String message = request.getBody().orElse(query);
    context.getLogger().info("Message:" + message);
    output.setValue(message);
    return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();

다음 예제에서는 Kafka 토픽에 여러 메시지를 보내는 방법을 보여줍니다.

@FunctionName("KafkaOutputMany")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<String[]> output,
        final ExecutionContext context) {
    context.getLogger().info("Java HTTP trigger processed a request.");
    String[] messages = new String[2];
    messages[0] = "one";
    messages[1] = "two";
    output.setValue(messages);
    return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}

이 예제에서는 출력 바인딩 매개 변수가 문자열 배열로 변경됩니다.

마지막 예제에서는 KafkaEntityKafkaHeader 클래스를 사용합니다.

public class KafkaEntity {
    public int Offset;
    public int Partition;
    public String Timestamp;
    public String Topic;
    public String Value;
    public KafkaHeaders Headers[];

    public KafkaEntity(int Offset, int Partition, String Topic, String Timestamp, String Value,KafkaHeaders[] headers) {
        this.Offset = Offset;
        this.Partition = Partition;
        this.Topic = Topic;
        this.Timestamp = Timestamp;
        this.Value = Value;
        this.Headers = headers;
    }
public class KafkaHeaders{
    public String Key;
    public String Value;

    public KafkaHeaders(String key, String value) {
        this.Key = key;
        this.Value = value;
    }

다음 예제 함수는 Kafka 토픽에 헤더가 있는 메시지를 보냅니다.

@FunctionName("KafkaOutputWithHeaders")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<KafkaEntity> output,
        final ExecutionContext context) {
            context.getLogger().info("Java HTTP trigger processed a request.");
    
            // Parse query parameter
            String query = request.getQueryParameters().get("message");
            String message = request.getBody().orElse(query);
            KafkaHeaders[] headers = new KafkaHeaders[1];
            headers[0] = new KafkaHeaders("test", "java");
            KafkaEntity kevent = new KafkaEntity(364, 0, "topic", "2022-04-09T03:20:06.591Z", message, headers);
            output.setValue(kevent);
            return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
        }

Confluent에서 작동하는 Java 전체 예제는 Kafka 확장 리포지토리를 참조하세요.

특성

In Process격리된 작업자 프로세스 C# 라이브러리는 모두 Kafka 특성을 사용하여 함수 트리거를 정의합니다.

다음 표에서는 이 특성을 사용하여 설정할 수 있는 속성에 대해 설명합니다.

매개 변수 설명
BrokerList (필수) 출력이 전송되는 Kafka broker 목록입니다. 자세한 내용은 연결을 참조하세요.
항목 (필수) 출력이 전송되는 토픽입니다.
AvroSchema (선택 사항) Avro 프로토콜을 사용하는 경우 제네릭 레코드의 스키마입니다.
MaxMessageBytes (선택 사항) 전송되는 출력 메시지의 최대 크기(MB)이며, 기본값은 1입니다.
BatchSize (선택 사항) 단일 메시지 세트에 일괄 처리되는 최대 메시지 수이며, 기본값은 10000입니다.
EnableIdempotence (선택 사항) true로 설정하면 메시지가 정확히 한 번만 원래 생산 순서대로 생성되며, 기본값은 false입니다.
MessageTimeoutMs (선택 사항) 로컬 메시지 시간 제한(밀리초)입니다. 이 값은 로컬에만 적용되며, 생성된 메시지가 성공적으로 배달되기를 기다리는 시간을 제한합니다. 기본값은 300000입니다. 0은 무한입니다. 이 값은 메시지를 배달하는 데 사용되는 최대 시간(재시도 포함)입니다. 배달 오류는 재시도 횟수 또는 메시지 시간 제한을 초과할 때 발생합니다.
RequestTimeoutMs (선택 사항) 출력 요청의 승인 시간 제한(밀리초)이며, 기본값은 5000입니다.
MaxRetries (선택 사항) 실패한 메시지 전송을 다시 시도하는 횟수이며, 기본값은 2입니다. EnableIdempotencetrue로 설정되지 않는 한, 다시 시도할 때 순서가 재정렬될 수 있습니다.
AuthenticationMode (선택 사항) SASL(단순 인증 및 보안 계층) 인증을 사용할 때의 인증 모드입니다. 지원되는 값은 Gssapi, Plain(기본값), ScramSha256, ScramSha512입니다.
사용자 이름 (선택 사항) SASL 인증의 사용자 이름입니다. AuthenticationModeGssapi일 때는 지원되지 않습니다. 자세한 내용은 연결을 참조하세요.
암호 (선택 사항) SASL 인증의 암호입니다. AuthenticationModeGssapi일 때는 지원되지 않습니다. 자세한 내용은 연결을 참조하세요.
프로토콜 (선택 사항) broker와 통신할 때 사용되는 보안 프로토콜입니다. 지원되는 값은 plaintext(기본값), ssl, sasl_plaintext, sasl_ssl입니다.
SslCaLocation (선택 사항) broker의 인증서를 확인하기 위한 CA 인증서 파일의 경로입니다.
SslCertificateLocation (선택 사항) 클라이언트의 인증서 경로입니다.
SslKeyLocation (선택 사항) 인증에 사용되는 클라이언트의 프라이빗 키(PEM) 경로입니다.
SslKeyPassword (선택 사항) 클라이언트의 인증서 암호입니다.

주석

KafkaOutput 주석을 사용하면 특정 토픽에 쓰는 함수를 만들 수 있습니다. 지원되는 옵션에는 다음 요소가 포함됩니다.

요소 설명
이름 함수 코드에서 조정된 데이터를 나타내는 변수의 이름입니다.
brokerList (필수) 출력이 전송되는 Kafka broker 목록입니다. 자세한 내용은 연결을 참조하세요.
topic (필수) 출력이 전송되는 토픽입니다.
dataType Functions에서 매개 변수 값을 처리하는 방법을 정의합니다. 기본적으로 값은 문자열로 가져오며, Functions는 문자열을 실제 POJO(Plain-Old Java Object)로 역직렬화하려고 합니다. string인 경우 입력은 문자열로만 처리됩니다. binary인 경우 메시지가 이진 데이터로 수신되고 Functions에서 실제 매개 변수 형식 byte[]로 역직렬화하려고 시도합니다.
avroSchema (선택 사항) Avro 프로토콜을 사용하는 경우 제네릭 레코드의 스키마입니다. (현재 Java에는 지원되지 않습니다.)
maxMessageBytes (선택 사항) 전송되는 출력 메시지의 최대 크기(MB)이며, 기본값은 1입니다.
batchSize (선택 사항) 단일 메시지 세트에 일괄 처리되는 최대 메시지 수이며, 기본값은 10000입니다.
enableIdempotence (선택 사항) true로 설정하면 메시지가 정확히 한 번만 원래 생산 순서대로 생성되며, 기본값은 false입니다.
messageTimeoutMs (선택 사항) 로컬 메시지 시간 제한(밀리초)입니다. 이 값은 로컬에만 적용되며, 생성된 메시지가 성공적으로 배달되기를 기다리는 시간을 제한합니다. 기본값은 300000입니다. 0은 무한입니다. 메시지를 배달하는 데 사용되는 최대 시간(재시도 포함)입니다. 배달 오류는 재시도 횟수 또는 메시지 시간 제한을 초과할 때 발생합니다.
requestTimeoutMs (선택 사항) 출력 요청의 승인 시간 제한(밀리초)이며, 기본값은 5000입니다.
maxRetries (선택 사항) 실패한 메시지 전송을 다시 시도하는 횟수이며, 기본값은 2입니다. EnableIdempotencetrue로 설정되지 않는 한, 다시 시도할 때 순서가 재정렬될 수 있습니다.
authenticationMode (선택 사항) SASL(단순 인증 및 보안 계층) 인증을 사용할 때의 인증 모드입니다. 지원되는 값은 Gssapi, Plain(기본값), ScramSha256, ScramSha512입니다.
username (선택 사항) SASL 인증의 사용자 이름입니다. AuthenticationModeGssapi일 때는 지원되지 않습니다. 자세한 내용은 연결을 참조하세요.
password (선택 사항) SASL 인증의 암호입니다. AuthenticationModeGssapi일 때는 지원되지 않습니다. 자세한 내용은 연결을 참조하세요.
protocol (선택 사항) broker와 통신할 때 사용되는 보안 프로토콜입니다. 지원되는 값은 plaintext(기본값), ssl, sasl_plaintext, sasl_ssl입니다.
sslCaLocation (선택 사항) broker의 인증서를 확인하기 위한 CA 인증서 파일의 경로입니다.
sslCertificateLocation (선택 사항) 클라이언트의 인증서 경로입니다.
sslKeyLocation (선택 사항) 인증에 사용되는 클라이언트의 프라이빗 키(PEM) 경로입니다.
sslKeyPassword (선택 사항) 클라이언트의 인증서 암호입니다.

구성

다음 표에서는 function.json 파일에 설정된 바인딩 구성 속성을 설명합니다.

function.json 속성 설명
type kafka로 설정해야 합니다.
direction out로 설정해야 합니다.
이름 함수 코드에서 조정된 데이터를 나타내는 변수의 이름입니다.
brokerList (필수) 출력이 전송되는 Kafka broker 목록입니다. 자세한 내용은 연결을 참조하세요.
topic (필수) 출력이 전송되는 토픽입니다.
avroSchema (선택 사항) Avro 프로토콜을 사용하는 경우 제네릭 레코드의 스키마입니다.
maxMessageBytes (선택 사항) 전송되는 출력 메시지의 최대 크기(MB)이며, 기본값은 1입니다.
batchSize (선택 사항) 단일 메시지 세트에 일괄 처리되는 최대 메시지 수이며, 기본값은 10000입니다.
enableIdempotence (선택 사항) true로 설정하면 메시지가 정확히 한 번만 원래 생산 순서대로 생성되며, 기본값은 false입니다.
messageTimeoutMs (선택 사항) 로컬 메시지 시간 제한(밀리초)입니다. 이 값은 로컬에만 적용되며, 생성된 메시지가 성공적으로 배달되기를 기다리는 시간을 제한합니다. 기본값은 300000입니다. 0은 무한입니다. 메시지를 배달하는 데 사용되는 최대 시간(재시도 포함)입니다. 배달 오류는 재시도 횟수 또는 메시지 시간 제한을 초과할 때 발생합니다.
requestTimeoutMs (선택 사항) 출력 요청의 승인 시간 제한(밀리초)이며, 기본값은 5000입니다.
maxRetries (선택 사항) 실패한 메시지 전송을 다시 시도하는 횟수이며, 기본값은 2입니다. EnableIdempotencetrue로 설정되지 않는 한, 다시 시도할 때 순서가 재정렬될 수 있습니다.
authenticationMode (선택 사항) SASL(단순 인증 및 보안 계층) 인증을 사용할 때의 인증 모드입니다. 지원되는 값은 Gssapi, Plain(기본값), ScramSha256, ScramSha512입니다.
username (선택 사항) SASL 인증의 사용자 이름입니다. AuthenticationModeGssapi일 때는 지원되지 않습니다. 자세한 내용은 연결을 참조하세요.
password (선택 사항) SASL 인증의 암호입니다. AuthenticationModeGssapi일 때는 지원되지 않습니다. 자세한 내용은 연결을 참조하세요.
protocol (선택 사항) broker와 통신할 때 사용되는 보안 프로토콜입니다. 지원되는 값은 plaintext(기본값), ssl, sasl_plaintext, sasl_ssl입니다.
sslCaLocation (선택 사항) broker의 인증서를 확인하기 위한 CA 인증서 파일의 경로입니다.
sslCertificateLocation (선택 사항) 클라이언트의 인증서 경로입니다.
sslKeyLocation (선택 사항) 인증에 사용되는 클라이언트의 프라이빗 키(PEM) 경로입니다.
sslKeyPassword (선택 사항) 클라이언트의 인증서 암호입니다.

사용

키와 값 형식 둘 다 기본 제공 AvroProtobuf 직렬화에서 지원됩니다.

이벤트의 오프셋, 파티션 및 타임스탬프는 런타임에 생성됩니다. 함수 내에서는 값과 헤더만 설정할 수 있습니다. 토픽은 function.json에서 설정합니다.

쓰려는 Kafka 토픽에 액세스할 수 있는지 확인하세요. 액세스 및 연결 자격 증명을 사용하여 Kafka 토픽에 대한 바인딩을 구성합니다.

프리미엄 플랜에서는 Kafka 출력이 여러 인스턴스로 스케일 아웃될 수 있도록 런타임 스케일링 모니터링을 사용하도록 설정해야 합니다. 자세한 내용은 런타임 스케일링 사용을 참조하세요.

Kafka 트리거를 지원하는 host.json 전체 설정은 host.json 설정을 참조하세요.

연결

트리거 및 바인딩에 필요한 모든 연결 정보는 코드의 바인딩 정의가 아닌 애플리케이션 설정에서 유지 관리되어야 합니다. 이는 절대로 코드에 저장하면 안 되는 자격 증명에 적용됩니다.

Important

자격 증명 설정은 애플리케이션 설정을 참조해야 합니다. 코드 또는 구성 파일에서 자격 증명을 하드 코딩하지 마세요. 로컬로 실행하는 경우 자격 증명에 local.settings.json 파일을 사용하고 local.settings.json 파일을 게시하지 마세요.

Azure의 Confluent에서 제공하는 관리형 Kafka 클러스터에 연결할 때 Confluent Cloud 환경에 대한 다음 인증 자격 증명이 트리거 또는 바인딩에 설정되어 있는지 확인합니다.

설정 권장 값 설명
BrokerList BootstrapServer BootstrapServer라는 앱 설정에는 Confluent Cloud 설정 페이지에 있는 부트스트랩 서버의 값이 포함됩니다. 값은 xyz-xyzxzy.westeurope.azure.confluent.cloud:9092 형식입니다.
사용자 이름 ConfluentCloudUsername ConfluentCloudUsername이라는 앱 설정에는 Confluent Cloud 웹 사이트의 API 액세스 키가 포함됩니다.
암호 ConfluentCloudPassword ConfluentCloudPassword라는 앱 설정에는 Confluent Cloud 웹 사이트의 API 비밀이 포함됩니다.

로컬 개발 중에 이러한 설정에 사용하는 문자열 값은 Azure의 애플리케이션 설정으로 또는 local.settings.json 파일Values 컬렉션에 있어야 합니다.

또한 바인딩 정의에서 Protocol, AuthenticationModeSslCaLocation을 설정해야 합니다.

다음 단계