다음을 통해 공유


read_kinesis 스트리밍 테이블 반환 함수

적용 대상:예로 표시된 확인 Databricks SQL 예로 표시된 확인 Databricks Runtime 13.3 LTS 이상

Kinesis에서 하나 이상의 스트림에서 읽은 레코드를 포함하는 테이블을 반환합니다.

구문

read_kinesis ( { parameter => value } [, ...] )

인수

read_kinesis 에는 명명된 매개 변수 호출이 필요합니다.

유일한 필수 인수는 .입니다 streamName. 다른 모든 인수는 선택 사항입니다.

인수에 대한 설명은 여기에서 간략하게 설명합니다. 자세한 내용은 Amazon Kinesis 설명서를 참조하세요.

AWS에 연결하고 인증하는 다양한 연결 옵션이 있습니다. awsAccessKeyawsSecretKey비밀 함수사용하여 함수 인수에 지정하거나, 인수에 수동으로 설정하거나, 아래에 표시된 대로 환경 변수로 구성할 수 있습니다. roleArn, roleExternalIDroleSessionName 인스턴스 프로필을 사용하여 AWS를 인증하는 데 사용할 수도 있습니다. 이러한 공급자를 지정하지 않으면 기본 AWS 공급자 체인을 사용합니다.

매개 변수 형식 설명
streamName STRING 하나 이상의 키네시스 스트림의 필수 쉼표로 구분된 목록입니다.
awsAccessKey STRING AWS 액세스 키(있는 경우)입니다. 환경 변수(AWS_ACCESS_KEY_ID) 및 자격 증명 프로필 파일을 포함하여 AWS 기본 자격 증명 공급자 체인을 통해 지원되는 다양한 옵션을 통해 지정할 수도 있습니다.
awsSecretKey STRING 액세스 키에 해당하는 비밀 키입니다. 인수 또는 환경 변수(AWS_SECRET_KEY 또는 AWS_SECRET_ACCESS_KEY) 및 자격 증명 프로필 파일을 포함하여 AWS 기본 자격 증명 공급자 체인을 통해 지원되는 다양한 옵션을 통해 지정할 수 있습니다.
roleArn STRING Kinesis에 액세스할 때 가정할 역할의 Amazon 리소스 이름입니다.
roleExternalId STRING AWS 계정에 대한 액세스를 위임할 때 사용됩니다.
roleSessionName STRING AWS 역할 세션 이름입니다.
stsEndpoint STRING 임시 액세스 자격 증명을 요청하기 위한 엔드포인트입니다.
region STRING 지정할 스트림의 지역입니다. 기본값은 로컬로 확인된 지역입니다.
endpoint STRING Kinesis 데이터 스트림에 대한 지역별 엔드포인트입니다. 기본값은 로컬로 확인된 지역입니다.
initialPosition STRING 스트림에서 읽기 위한 시작 위치입니다. 'latest'(기본값), 'trim_horizon', 'earliest', 'at_timestamp' 중 하나입니다.
consumerMode STRING '폴링'(기본값) 또는 'EFO'(고급 팬아웃) 중 하나입니다.
consumerName STRING 소비자의 이름입니다. 모든 소비자는 접두사로 'databricks_'을 갖습니다. 기본값은 빈 문자열입니다.
registerConsumerTimeoutInterval STRING Kinesis EFO 소비자가 Kinesis 스트림에 등록될 때까지 대기하는 최대 시간 제한입니다. 기본값은 '300s'입니다.
requireConsumerDeregistration BOOLEAN true 쿼리 종료에 EFO 소비자를 등록 취소합니다. 기본값은 false입니다.
deregisterConsumerTimeoutInterval STRING Kinesis EFO 소비자가 오류를 throw하기 전에 Kinesis 스트림으로 등록을 취소할 때까지 대기하는 최대 시간 제한입니다. 기본값은 '300s'입니다.
consumerRefreshInterval STRING 소비자가 검사되고 새로 고쳐지는 간격입니다. 기본값은 '300s'입니다.

Kinesis의 읽기 처리량 및 대기 시간을 제어하는 데 사용되는 인수는 다음과 같습니다.

매개 변수 형식 설명
maxRecordsPerFetch INTEGER (>0) 선택 사항으로, Kinesis에 대한 API 요청당 읽을 레코드는 기본값인 10,000개입니다.
maxFetchRate STRING 분할당 데이터를 프리페치하는 속도입니다. MB/s로 측정된 '1.0'과 '2.0' 사이의 값입니다. 기본값은 '1.0'입니다.
minFetchPeriod STRING 연속 프리페치 시도 사이의 최대 대기 시간입니다. 기본값은 '400ms'입니다.
maxFetchDuration STRING 프리페치된 새 데이터를 버퍼링할 최대 기간입니다. 기본값은 '10s'입니다.
fetchBufferSize STRING 다음 트리거에 대한 데이터 양입니다. 기본값은 '20gb'입니다.
shardsPerTask INTEGER (>0) Spark 작업당 병렬로 프리페치할 Kinesis 분할된 데이터베이스의 수입니다. 기본값은 5입니다.
shardFetchinterval STRING 분할을 폴링하는 빈도입니다. 기본값은 '1s'입니다.
coalesceThresholdBlockSize INTEGER (>0) 자동 병합이 발생하는 임계값입니다. 기본값은 10,000,000입니다.
coalesce BOOLEAN true 를 사용하여 프리페치된 요청을 병합합니다. 기본값은 true입니다.
coalesceBinSize INTEGER (>0) 병합 후 대략적인 블록 크기입니다. 기본값은 128,000,000입니다.
reuseKinesisClient BOOLEAN true 캐시에 저장된 Kinesis 클라이언트를 다시 사용합니다. 기본값은 true PE 클러스터를 제외한 것입니다.
clientRetries INTEGER (>0) 재시도 시나리오의 재시도 횟수입니다. 기본값은 5입니다.

반품

Kinesis 레코드의 테이블은 다음 스키마를 가지고 있습니다.

속성 데이터 형식 Nullable Standard 설명
partitionKey STRING 아니요 스트림의 분할된 데이터베이스 간에 데이터를 배포하는 데 사용되는 키입니다. 동일한 파티션 키를 가진 모든 데이터 레코드는 동일한 샤드에서 읽힙니다.
data BINARY 아니요 base-64로 인코딩된 kinesis 데이터 페이로드입니다.
stream STRING 아니요 데이터를 읽은 스트림의 이름입니다.
shardId STRING 아니요 데이터를 읽은 샤드의 고유 식별자입니다.
sequenceNumber BIGINT 아니요 샤드 내 레코드의 고유 식별자입니다.
approximateArrivalTimestamp TIMESTAMP 아니요 레코드가 스트림에 삽입된 대략적인 시간입니다.

(stream, shardId, sequenceNumber)은 기본 키를 구성합니다.

예제

-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        awsAccessKey => secret(‘test-databricks’, ‘awsAccessKey’),
        awsSecretKey => secret(‘test-databricks’, ‘awsSecretKey’),
        initialPosition => 'earliest');

-- The data would now need to be queried from the testing.streaming_table

-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        initialPosition => 'earliest');

-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        initialPosition => 'earliest',
        roleArn => 'arn:aws:iam::123456789012:role/MyRole',
        roleSessionName => 'testing@databricks.com');