read_kinesis
strömmande tabellvärdesfunktion
Gäller för: Databricks SQL Databricks Runtime 13.3 LTS och senare
Returnerar en tabell med poster som lästs från Kinesis från en eller flera strömmar.
Syntax
read_kinesis ( { parameter => value } [, ...] )
Argument
read_kinesis
kräver namngiven parameteranrop.
Det enda argumentet som krävs är streamName
. Alla andra argument är valfria.
Beskrivningarna av argumenten är korta här. Mer information finns i Amazon Kinesis-dokumentationen.
Det finns olika anslutningsalternativ för att ansluta och autentisera med AWS.
awsAccessKey
, och awsSecretKey
kan antingen anges i funktionsargumenten med hjälp av den hemliga funktionen, anges manuellt i argumenten eller konfigureras som miljövariabler enligt nedan.
roleArn
, roleExternalID
, roleSessionName
kan också användas för att autentisera med AWS med hjälp av instansprofiler.
Om inget av dessa anges använder den standardkedjan för AWS-providern.
Parameter | Typ | Beskrivning |
---|---|---|
streamName |
STRING |
Obligatorisk, kommaavgränsad lista över en eller flera kinesis-strömmar. |
awsAccessKey |
STRING |
AWS-åtkomstnyckeln, om den finns. Kan också anges via de olika alternativ som stöds via AWS standardleverantörskedja för autentiseringsuppgifter, inklusive miljövariabler (AWS_ACCESS_KEY_ID ) och en profilfil för autentiseringsuppgifter. |
awsSecretKey |
STRING |
Den hemliga nyckel som motsvarar åtkomstnyckeln. Kan anges antingen i argumenten eller via de olika alternativ som stöds via AWS standardleverantörskedja för autentiseringsuppgifter, inklusive miljövariabler (AWS_SECRET_KEY eller AWS_SECRET_ACCESS_KEY ) och en profilfil för autentiseringsuppgifter. |
roleArn |
STRING |
Amazon-resursnamnet för rollen som ska antas vid åtkomst till Kinesis. |
roleExternalId |
STRING |
Används vid delegering av åtkomst till AWS-kontot. |
roleSessionName |
STRING |
Namn på AWS-rollsession. |
stsEndpoint |
STRING |
En slutpunkt för att begära autentiseringsuppgifter för tillfällig åtkomst. |
region |
STRING |
Region för de strömmar som ska anges. Standardvärdet är den lokalt lösta regionen. |
endpoint |
STRING |
regional slutpunkt för Kinesis-dataströmmar. Standardvärdet är den lokalt lösta regionen. |
initialPosition |
STRING |
Startposition för läsning från strömmen. En av: "senaste" (standard), "trim_horizon", "tidigast", "at_timestamp". |
consumerMode |
STRING |
En av: "polling" (standard) eller "EFO" (enhanced--out). |
consumerName |
STRING |
Namnet på konsumenten. Alla konsumenter har prefixet "databricks_". Standardvärdet är en tom sträng. |
registerConsumerTimeoutInterval |
STRING |
den maximala tidsgränsen för att vänta tills Kinesis EFO-konsumenten har registrerats med Kinesis-strömmen innan ett fel uppstår. Standardvärdet är "300s". |
requireConsumerDeregistration |
BOOLEAN |
true för att avregistrera EFO-konsumenten vid frågeavslut. Standard är false . |
deregisterConsumerTimeoutInterval |
STRING |
Den maximala tidsgränsen för att vänta tills Kinesis EFO-konsumenten avregistreras med Kinesis-strömmen innan ett fel utlöses. Standardvärdet är "300s". |
consumerRefreshInterval |
STRING |
Det intervall med vilket konsumenten kontrolleras och uppdateras. Standardvärdet är "300s". |
Följande argument används för att kontrollera läsdataflödet och svarstiden för Kinesis:
Parameter | Typ | Beskrivning |
---|---|---|
maxRecordsPerFetch |
INTEGER (>0) |
Valfritt, med standardvärdet 10 000 poster som ska läsas per API-begäran till Kinesis. |
maxFetchRate |
STRING |
Hur snabbt du förinstallerar data per shard. Ett värde mellan 1,0 och 2,0 som mäts i MB/s. Standardvärdet är "1.0". |
minFetchPeriod |
STRING |
Den maximala väntetiden mellan efterföljande prefetch-försök. Standardvärdet är "400ms". |
maxFetchDuration |
STRING |
Den maximala varaktigheten för buffring av förinstallerade nya data. Standardvärdet är "10s". |
fetchBufferSize |
STRING |
Mängden data för nästa utlösare. Standardvärdet är "20gb". |
shardsPerTask |
INTEGER (>0) |
Antalet Kinesis-shards som ska förberättas parallellt per spark-aktivitet. Standardinställningen är 5. |
shardFetchinterval |
STRING |
Hur ofta du söker efter partitionering. Standardvärdet är "1s". |
coalesceThresholdBlockSize |
INTEGER (>0) |
Det tröskelvärde vid vilket automatisk sammansning sker. Standardvärdet är 10 000 000. |
coalesce |
BOOLEAN |
true för att sammanslås med förinställda begäranden. Standardvärdet är true . |
coalesceBinSize |
INTEGER (>0) |
Den ungefärliga blockstorleken efter sammankoppling. Standardvärdet är 128 000 000. |
reuseKinesisClient |
BOOLEAN |
true för att återanvända Kinesis-klienten som lagras i cacheminnet. Standardvärdet är true förutom i ett PE-kluster. |
clientRetries |
INTEGER (>0) |
Antalet återförsök i återförsöksscenariot. Standardinställningen är 5. |
Returer
En tabell med Kinesis-poster med följande schema:
Name | Datatyp | Kan ha värdet null | Standard | beskrivning |
---|---|---|---|---|
partitionKey |
STRING |
Nej | En nyckel som används för att distribuera data mellan fragmenten i en dataström. Alla dataposter med samma partitionsnyckel kommer att läsas från samma fragment. | |
data |
BINARY |
Nej | Kinesis-datanyttolasten, base-64-kodad. | |
stream |
STRING |
Nej | Namnet på strömmen där data lästes från. | |
shardId |
STRING |
Nej | En unik identifierare för fragmentet där data lästes från. | |
sequenceNumber |
BIGINT |
Nej | Den unika identifieraren för posten i dess fragment. | |
approximateArrivalTimestamp |
TIMESTAMP |
Nej | Den ungefärliga tid då posten infogades i dataströmmen. |
Kolumnerna (stream, shardId, sequenceNumber)
utgör en primärnyckel.
Exempel
-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
awsAccessKey => secret(‘test-databricks’, ‘awsAccessKey’),
awsSecretKey => secret(‘test-databricks’, ‘awsSecretKey’),
initialPosition => 'earliest');
-- The data would now need to be queried from the testing.streaming_table
-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest');
-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest',
roleArn => 'arn:aws:iam::123456789012:role/MyRole',
roleSessionName => 'testing@databricks.com');