다음을 통해 공유


read_kafka 테이블 값 함수

적용 대상:예로 표시된 확인 Databricks SQL 예로 표시된 확인 Databricks Runtime 13.3 LTS 이상

Apache Kafka 클러스터에서 데이터를 읽고 테이블 형식으로 데이터를 반환합니다.

하나 이상의 Kafka 토픽에서 데이터를 읽을 수 있습니다. 일괄 처리 쿼리와 스트리밍 수집을 모두 지원합니다.

구문

read_kafka([option_key => option_value ] [, ...])

인수

이 함수에는 명명된 매개 변수 호출이 필요합니다.

  • option_key: 구성할 옵션의 이름입니다. 점()을 포함하는 옵션에는 백틱(.')을 사용해야 합니다.
  • option_value: 옵션을 설정하는 상수 식입니다. 리터럴 및 스칼라 함수를 허용합니다.

반품

다음 스키마를 사용하여 Apache Kafka 클러스터에서 읽은 레코드:

  • key BINARY: Kafka 레코드의 키입니다.
  • value BINARY NOT NULL: Kafka 레코드의 값입니다.
  • topic STRING NOT NULL: 레코드를 읽은 Kafka 토픽의 이름입니다.
  • partition INT NOT NULL: 레코드를 읽은 Kafka 파티션의 ID입니다.
  • offset BIGINT NOT NULL: Kafka TopicPartition레코드의 오프셋 번호입니다.
  • timestamp TIMESTAMP NOT NULL: 레코드의 타임스탬프 값입니다. timestampType 열은 이 타임스탬프가 해당하는 항목을 정의합니다.
  • timestampType INTEGER NOT NULL: timestamp 열에 지정된 타임스탬프의 형식입니다.
  • headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>: 레코드의 일부로 제공된 헤더 값입니다(사용하도록 설정된 경우).

예제

-- A batch query to read from a topic.
> SELECT value::string as value
  FROM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  ) LIMIT 10;

-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
  SELECT
    value::string:events,                 -- extract the field `events`
    to_timestamp(value::string:ts) as ts  -- extract the field `ts` and cast to timestamp
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  );

옵션

Apache Spark 설명서자세한 옵션 목록을 찾을 수 있습니다.

필수 옵션

Kafka 클러스터에 연결하기 위한 아래 옵션을 제공합니다.

옵션
bootstrapServers

유형: String

Kafka 클러스터를 가리키는 호스트/포트 쌍의 쉼표로 구분된 목록입니다.

기본값: 없음

아래 옵션 중 하나만 제공하여 데이터를 가져올 Kafka 토픽을 구성합니다.

옵션
assign

유형: String

사용할 특정 토픽 파티션을 포함하는 JSON 문자열입니다. 예를 들어 '{"topicA":[0,1],"topicB":[2,4]}'topicA의 0번째 및 첫 번째 파티션은 소비됩니다.

기본값: 없음
subscribe

유형: String

읽을 Kafka 항목의 쉼표로 구분된 목록입니다.

기본값: 없음
subscribePattern

유형: String

구독할 정규식 일치 항목입니다.

기본값: 없음

기타 옵션

read_kafka 는 일괄 처리 쿼리 및 스트리밍 쿼리에서 사용할 수 있습니다. 아래 옵션은 적용할 쿼리 유형을 지정합니다.

옵션
endingOffsets

형식: String 쿼리 형식: 일괄 처리만

일괄 처리 쿼리에서 읽어야 할 오프셋은 최신 레코드를 지정하는 "latest" 또는 각 TopicPartition에 대한 끝 오프셋을 지정하는 JSON 문자열 중 하나로 나타낼 수 있습니다. JSON에서 -1 오프셋으로 사용하여 최신 항목을 참조할 수 있습니다. 오프셋으로 -2(초기)는 허용되지 않습니다.

기본값: "latest"
endingOffsetsByTimestamp

형식: String 쿼리 형식: 일괄 처리만

각 TopicPartition에 대해 읽을 끝 타임스탬프를 지정하는 JSON 문자열입니다. 타임스탬프는 타임스탬프의 긴 값(예: 밀리초)으로 제공되어야 합니다.1970-01-01 00:00:00 UTC
1686444353000. 타임스탬프가 있는 동작에 대한 자세한 내용은 아래 참고를 참조하세요.
endingOffsetsByTimestamp 는 .보다 endingOffsets우선합니다.

기본값: 없음
endingTimestamp

형식: String 쿼리 형식: 일괄 처리만

이후 타임스탬프의 문자열 값(밀리초)
1970-01-01 00:00:00 UTC예를 들면 다음과 같습니다 "1686444353000". Kafka가 일치하는 오프셋을 반환하지 않으면 오프셋이 최신으로 설정됩니다. 타임스탬프가 있는 동작에 대한 자세한 내용은 아래 참고를 참조하세요. 참고: endingTimestamp 우선 순위 endingOffsetsByTimestamp
endingOffsets.

기본값: 없음
includeHeaders

형식: Boolean 쿼리 형식: 스트리밍 및 일괄 처리

행에 Kafka 헤더를 포함할지 여부입니다.

기본값: false
kafka.<consumer_option>

형식: String 쿼리 형식: 스트리밍 및 일괄 처리

모든 Kafka 소비자별 옵션은 접두사를 사용하여 kafka. 전달할 수 있습니다. 이러한 옵션은 제공될 때 백틱으로 묶어야 합니다. 그렇지 않으면 파서 오류가 발생합니다. Kafka 설명서에서 옵션을 찾을 수 있습니다.

참고: 이 함수를 사용하여 다음 옵션을 설정하면 안 됩니다.
key.deserializer, value.deserializer, bootstrap.serversgroup.id

기본값: 없음
maxOffsetsPerTrigger

형식: Long 쿼리 형식: 스트리밍만

트리거 간격당 처리되는 최대 오프셋 수 또는 행 수에 대한 속도 제한입니다. 지정된 총 오프셋 수는 TopicPartitions 간에 비례적으로 분할됩니다.

기본값: 없음
startingOffsets

형식: String 쿼리 형식: 스트리밍 및 일괄 처리

쿼리의 시작 지점은 "earliest" 가장 이른 오프셋에서 시작하거나, "latest" 가장 최근 오프셋에서만 시작하거나, 또는 각 TopicPartition의 시작 오프셋을 지정하는 JSON 문자열일 수 있습니다. JSON에서, -2는 가장 빠른 값을, -1는 가장 최신 값을 참조하는 오프셋으로 사용할 수 있습니다.

참고: 일괄 처리 쿼리의 경우 최신(암시적으로 또는 JSON에서 -1 사용)은 허용되지 않습니다. 스트리밍 쿼리의 경우 새 쿼리가 시작될 때만 적용됩니다. 다시 시작한 스트리밍 쿼리는 쿼리 검사점에서 정의된 오프셋에서 계속됩니다. 쿼리 중에 새로 검색된 파티션은 가장 일찍 시작됩니다.

기본값: "latest" 스트리밍의 경우, "earliest" 일괄 처리의 경우
startingOffsetsByTimestamp

형식: String 쿼리 형식: 스트리밍 및 일괄 처리

각 TopicPartition에 대한 시작 타임스탬프를 지정하는 JSON 문자열입니다. 타임스탬프는 타임스탬프의 긴 값(예1970-01-01 00:00:00 UTC: 밀리초1686444353000)으로 제공되어야 합니다. 타임스탬프가 있는 동작에 대한 자세한 내용은 아래 참고를 참조하세요. Kafka가 일치하는 오프셋을 반환하지 않으면 동작은 옵션 startingOffsetsByTimestampStrategy값에 따릅니다.
startingOffsetsByTimestamp 는 .보다 startingOffsets우선합니다.

참고: 스트리밍 쿼리의 경우 새 쿼리가 시작될 때만 적용됩니다. 다시 시작한 스트리밍 쿼리는 쿼리 검사점에서 정의된 오프셋에서 계속됩니다. 쿼리 중에 새로 검색된 파티션은 가장 일찍 시작됩니다.

기본값: 없음
startingOffsetsByTimestampStrategy

형식: String 쿼리 형식: 스트리밍 및 일괄 처리

이 전략은 타임스탬프별 지정된 시작 오프셋(전역 또는 파티션당)이 반환된 Kafka 오프셋과 일치하지 않는 경우에 사용됩니다. 사용 가능한 전략은 다음과 같습니다.

- "error": 쿼리 실패
- "latest": Spark가 이후 마이크로 일괄 처리에서 이러한 파티션에서 최신 레코드를 읽을 수 있도록 이러한 파티션에 대한 최신 오프셋을 할당합니다.

기본값: "error"
startingTimestamp

형식: String 쿼리 형식: 스트리밍 및 일괄 처리

이후 타임스탬프의 문자열 값(밀리초)
1970-01-01 00:00:00 UTC예를 들면 다음과 같습니다 "1686444353000". 타임스탬프가 있는 동작에 대한 자세한 내용은 아래 참고를 참조하세요. Kafka가 일치하는 오프셋을 반환하지 않으면 동작은 옵션 startingOffsetsByTimestampStrategy값에 따릅니다.
startingTimestamp는 우선 startingOffsetsByTimestamp 순위를 하며 .startingOffsets

참고: 스트리밍 쿼리의 경우 새 쿼리가 시작될 때만 적용됩니다. 다시 시작한 스트리밍 쿼리는 쿼리 검사점에서 정의된 오프셋에서 계속됩니다. 쿼리 중에 새로 검색된 파티션이 가장 일찍 시작됩니다.

기본값: 없음

참고 항목

각 파티션에 대해 반환된 오프셋은 타임스탬프가 해당 파티션의 지정된 타임스탬프보다 크거나 같은 가장 빠른 오프셋입니다. Kafka가 일치하는 오프셋을 반환하지 않는 경우 동작은 옵션에 따라 다릅니다. 각 옵션에 대한 설명을 확인합니다.

Spark는 단순히 타임스탬프 정보를 KafkaConsumer.offsetsForTimes전달하며 값에 대한 해석이나 이유를 전달하지 않습니다. 자세한 내용은 KafkaConsumer.offsetsForTimes설명서를 참조하세요. 또한 여기서 타임스탬프의 의미는 Kafka 구성(log.message.timestamp.type)에 따라 달라질 수 있습니다. 자세한 내용은 Apache Kafka 설명서를 참조 하세요.