Dela via


read_kinesis strömmande tabellvärdesfunktion

Gäller för: markerad ja Databricks SQL markerad ja Databricks Runtime 13.3 LTS och senare

Returnerar en tabell med poster som lästs från Kinesis från en eller flera strömmar.

Syntax

read_kinesis ( { parameter => value } [, ...] )

Argument

read_kinesis kräver namngiven parameteranrop.

Det enda argumentet som krävs är streamName. Alla andra argument är valfria.

Beskrivningarna av argumenten är korta här. Mer information finns i Amazon Kinesis-dokumentationen.

Det finns olika anslutningsalternativ för att ansluta och autentisera med AWS. awsAccessKey, och awsSecretKey kan antingen anges i funktionsargumenten med hjälp av den hemliga funktionen, anges manuellt i argumenten eller konfigureras som miljövariabler enligt nedan. roleArn, roleExternalID, roleSessionName kan också användas för att autentisera med AWS med hjälp av instansprofiler. Om inget av dessa anges använder den standardkedjan för AWS-providern.

Parameter Typ Beskrivning
streamName STRING Obligatorisk, kommaavgränsad lista över en eller flera kinesis-strömmar.
awsAccessKey STRING AWS-åtkomstnyckeln, om den finns. Kan också anges via de olika alternativ som stöds via AWS standardleverantörskedja för autentiseringsuppgifter, inklusive miljövariabler (AWS_ACCESS_KEY_ID) och en profilfil för autentiseringsuppgifter.
awsSecretKey STRING Den hemliga nyckel som motsvarar åtkomstnyckeln. Kan anges antingen i argumenten eller via de olika alternativ som stöds via AWS standardleverantörskedja för autentiseringsuppgifter, inklusive miljövariabler (AWS_SECRET_KEY eller AWS_SECRET_ACCESS_KEY) och en profilfil för autentiseringsuppgifter.
roleArn STRING Amazon-resursnamnet för rollen som ska antas vid åtkomst till Kinesis.
roleExternalId STRING Används vid delegering av åtkomst till AWS-kontot.
roleSessionName STRING Namn på AWS-rollsession.
stsEndpoint STRING En slutpunkt för att begära autentiseringsuppgifter för tillfällig åtkomst.
region STRING Region för de strömmar som ska anges. Standardvärdet är den lokalt lösta regionen.
endpoint STRING regional slutpunkt för Kinesis-dataströmmar. Standardvärdet är den lokalt lösta regionen.
initialPosition STRING Startposition för läsning från strömmen. En av: "senaste" (standard), "trim_horizon", "tidigast", "at_timestamp".
consumerMode STRING En av: "polling" (standard) eller "EFO" (enhanced--out).
consumerName STRING Namnet på konsumenten. Alla konsumenter har prefixet "databricks_". Standardvärdet är en tom sträng.
registerConsumerTimeoutInterval STRING den maximala tidsgränsen för att vänta tills Kinesis EFO-konsumenten har registrerats med Kinesis-strömmen innan ett fel uppstår. Standardvärdet är "300s".
requireConsumerDeregistration BOOLEAN true för att avregistrera EFO-konsumenten vid frågeavslut. Standard är false.
deregisterConsumerTimeoutInterval STRING Den maximala tidsgränsen för att vänta tills Kinesis EFO-konsumenten avregistreras med Kinesis-strömmen innan ett fel utlöses. Standardvärdet är "300s".
consumerRefreshInterval STRING Det intervall med vilket konsumenten kontrolleras och uppdateras. Standardvärdet är "300s".

Följande argument används för att kontrollera läsdataflödet och svarstiden för Kinesis:

Parameter Typ Beskrivning
maxRecordsPerFetch INTEGER (>0) Valfritt, med standardvärdet 10 000 poster som ska läsas per API-begäran till Kinesis.
maxFetchRate STRING Hur snabbt du förinstallerar data per shard. Ett värde mellan 1,0 och 2,0 som mäts i MB/s. Standardvärdet är "1.0".
minFetchPeriod STRING Den maximala väntetiden mellan efterföljande prefetch-försök. Standardvärdet är "400ms".
maxFetchDuration STRING Den maximala varaktigheten för buffring av förinstallerade nya data. Standardvärdet är "10s".
fetchBufferSize STRING Mängden data för nästa utlösare. Standardvärdet är "20gb".
shardsPerTask INTEGER (>0) Antalet Kinesis-shards som ska förberättas parallellt per spark-aktivitet. Standardinställningen är 5.
shardFetchinterval STRING Hur ofta du söker efter partitionering. Standardvärdet är "1s".
coalesceThresholdBlockSize INTEGER (>0) Det tröskelvärde vid vilket automatisk sammansning sker. Standardvärdet är 10 000 000.
coalesce BOOLEAN true för att sammanslås med förinställda begäranden. Standardvärdet är true.
coalesceBinSize INTEGER (>0) Den ungefärliga blockstorleken efter sammankoppling. Standardvärdet är 128 000 000.
reuseKinesisClient BOOLEAN true för att återanvända Kinesis-klienten som lagras i cacheminnet. Standardvärdet är true förutom i ett PE-kluster.
clientRetries INTEGER (>0) Antalet återförsök i återförsöksscenariot. Standardinställningen är 5.

Returer

En tabell med Kinesis-poster med följande schema:

Name Datatyp Kan ha värdet null Standard beskrivning
partitionKey STRING Nej En nyckel som används för att distribuera data mellan fragmenten i en dataström. Alla dataposter med samma partitionsnyckel kommer att läsas från samma fragment.
data BINARY Nej Kinesis-datanyttolasten, base-64-kodad.
stream STRING Nej Namnet på strömmen där data lästes från.
shardId STRING Nej En unik identifierare för fragmentet där data lästes från.
sequenceNumber BIGINT Nej Den unika identifieraren för posten i dess fragment.
approximateArrivalTimestamp TIMESTAMP Nej Den ungefärliga tid då posten infogades i dataströmmen.

Kolumnerna (stream, shardId, sequenceNumber) utgör en primärnyckel.

Exempel

-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        awsAccessKey => secret(‘test-databricks’, ‘awsAccessKey’),
        awsSecretKey => secret(‘test-databricks’, ‘awsSecretKey’),
        initialPosition => 'earliest');

-- The data would now need to be queried from the testing.streaming_table

-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        initialPosition => 'earliest');

-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        initialPosition => 'earliest',
        roleArn => 'arn:aws:iam::123456789012:role/MyRole',
        roleSessionName => 'testing@databricks.com');