Delen via


read_kinesis functie met streamingtabelwaarde

Van toepassing op: vinkje als ja aan Databricks SQL vinkje als ja aan Databricks Runtime 13.3 LTS en hoger

Retourneert een tabel met records die zijn gelezen uit Kinesis uit een of meer streams.

Syntaxis

read_kinesis ( { parameter => value } [, ...] )

Argumenten

read_kinesisvereist aanroepen van benoemde parameters.

Het enige vereiste argument is streamName. Alle andere argumenten zijn optioneel.

De beschrijvingen van de argumenten zijn hier kort. Zie de Amazon Kinesis-documentatie voor meer informatie.

Er zijn verschillende verbindingsopties om verbinding te maken en te verifiëren met AWS. awsAccessKeyen awsSecretKey kan worden opgegeven in de functieargumenten met behulp van de geheime functie, handmatig in de argumenten instellen of geconfigureerd als omgevingsvariabelen zoals hieronder wordt aangegeven. roleArnroleSessionName, roleExternalIDkan ook worden gebruikt voor verificatie met AWS met behulp van exemplaarprofielen. Als geen van deze is opgegeven, wordt de standaardKETEN van de AWS-provider gebruikt.

Parameter Type Description
streamName STRING Vereiste, door komma's gescheiden lijst van een of meer kinesisstromen.
awsAccessKey STRING De AWS-toegangssleutel, indien van toepassing. Kan ook worden opgegeven via de verschillende opties die worden ondersteund via de standaardketen van de AWS-referentieprovider, inclusief omgevingsvariabelen (AWS_ACCESS_KEY_ID) en een bestand met referentieprofielen.
awsSecretKey STRING De geheime sleutel die overeenkomt met de toegangssleutel. Kan worden opgegeven in de argumenten of via de verschillende opties die worden ondersteund via de standaardketen van de AWS-referentieprovider, inclusief omgevingsvariabelen (AWS_SECRET_KEY of AWS_SECRET_ACCESS_KEY) en een bestand met referentiesprofielen.
roleArn STRING Amazon-resourcenaam van de rol die moet worden aangenomen bij het openen van Kinesis.
roleExternalId STRING Wordt gebruikt bij het delegeren van toegang tot het AWS-account.
roleSessionName STRING Naam van AWS-rolsessie.
stsEndpoint STRING Een eindpunt voor het aanvragen van referenties voor tijdelijke toegang.
region STRING Regio voor de streams die moeten worden opgegeven. De standaardwaarde is de lokaal opgeloste regio.
endpoint STRING regionaal eindpunt voor Kinesis-gegevensstromen. De standaardwaarde is de lokaal opgeloste regio.
initialPosition STRING Beginpositie voor lezen vanuit de stroom. Een van de volgende: 'latest' (standaard), 'trim_horizon', 'earliest', 'at_timestamp'.
consumerMode STRING Een van: 'polling' (standaard) of 'EFO' (enhanced-fan-out).
consumerName STRING De naam van de consument. Alle consumenten worden voorafgegaan door 'databricks_'. De standaardwaarde is een lege tekenreeks.
registerConsumerTimeoutInterval STRING de maximale time-out om te wachten totdat de Kinesis EFO-consument is geregistreerd bij de Kinesis-stroom voordat er een fout wordt gegenereerd. De standaardwaarde is '300s'.
requireConsumerDeregistration BOOLEAN true om de EFO-consument bij het beëindigen van query's ongedaan te maken. Standaard is false.
deregisterConsumerTimeoutInterval STRING De maximale time-out om te wachten totdat de Kinesis EFO-consument wordt gederegistereerd met de Kinesis-stream voordat er een fout wordt gegenereerd. De standaardwaarde is '300s'.
consumerRefreshInterval STRING Het interval waarmee de consument wordt gecontroleerd en vernieuwd. De standaardwaarde is '300s'.

De volgende argumenten worden gebruikt voor het beheren van de leesdoorvoer en latentie voor Kinesis:

Parameter Type Description
maxRecordsPerFetch INTEGER (>0) Optioneel, met een standaardwaarde van 10.000 records die per API-aanvraag naar Kinesis moeten worden gelezen.
maxFetchRate STRING Hoe snel u gegevens per shard kunt vooraf fetcheren. Een waarde tussen '1,0' en '2.0' die wordt gemeten in MB/s. De standaardwaarde is '1.0'.
minFetchPeriod STRING De maximale wachttijd tussen opeenvolgende voorfetchpogingen. De standaardwaarde is 400 ms.
maxFetchDuration STRING De maximale duur voor het bufferen van nieuwe gegevens. De standaardwaarde is '10s'.
fetchBufferSize STRING De hoeveelheid gegevens voor de volgende trigger. De standaardwaarde is '20 gb'.
shardsPerTask INTEGER (>0) Het aantal Kinesis-shards dat parallel per spark-taak vooraf moet worden uitgevoerd. De standaard is 5.
shardFetchinterval STRING Hoe vaak moet er worden gepeild naar resharding. De standaardwaarde is '1s'.
coalesceThresholdBlockSize INTEGER (>0) De drempel waarop automatische samenvoeging plaatsvindt. De standaardwaarde is 10.000.000.
coalesce BOOLEAN true om vooraf gemaakte aanvragen samen te voegen. De standaardwaarde is true.
coalesceBinSize INTEGER (>0) De geschatte blokgrootte na het samenvoegen. De standaardwaarde is 128.000.000.
reuseKinesisClient BOOLEAN true om de Kinesis-client die in de cache is opgeslagen, opnieuw te gebruiken. De standaardwaarde is true behalve op een PE-cluster.
clientRetries INTEGER (>0) Het aantal nieuwe pogingen in het scenario voor opnieuw proberen. De standaard is 5.

Retouren

Een tabel met Kinesis-records met het volgende schema:

Naam Gegevenstype Null-waarde toegestaan Standaard Beschrijving
partitionKey STRING Nee Een sleutel die wordt gebruikt voor het distribueren van gegevens tussen de shards van een stream. Alle gegevensrecords met dezelfde partitiesleutel worden gelezen uit dezelfde shard.
data BINARY Nee De nettolading van de kinesis-gegevens, base-64 gecodeerd.
stream STRING Nee De naam van de stream waaruit de gegevens zijn gelezen.
shardId STRING Nee Een unieke id voor de shard waaruit de gegevens zijn gelezen.
sequenceNumber BIGINT Nee De unieke id van de record in de shard.
approximateArrivalTimestamp TIMESTAMP Nee De geschatte tijd dat de record in de stream is ingevoegd.

De kolommen (stream, shardId, sequenceNumber) vormen een primaire sleutel.

Voorbeelden

-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        awsAccessKey => secret(‘test-databricks’, ‘awsAccessKey’),
        awsSecretKey => secret(‘test-databricks’, ‘awsSecretKey’),
        initialPosition => 'earliest');

-- The data would now need to be queried from the testing.streaming_table

-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        initialPosition => 'earliest');

-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        initialPosition => 'earliest',
        roleArn => 'arn:aws:iam::123456789012:role/MyRole',
        roleSessionName => 'testing@databricks.com');