read_kinesis
functie met streamingtabelwaarde
Van toepassing op: Databricks SQL Databricks Runtime 13.3 LTS en hoger
Retourneert een tabel met records die zijn gelezen uit Kinesis uit een of meer streams.
Syntaxis
read_kinesis ( { parameter => value } [, ...] )
Argumenten
read_kinesis
vereist aanroepen van benoemde parameters.
Het enige vereiste argument is streamName
. Alle andere argumenten zijn optioneel.
De beschrijvingen van de argumenten zijn hier kort. Zie de Amazon Kinesis-documentatie voor meer informatie.
Er zijn verschillende verbindingsopties om verbinding te maken en te verifiëren met AWS.
awsAccessKey
en awsSecretKey
kan worden opgegeven in de functieargumenten met behulp van de geheime functie, handmatig in de argumenten instellen of geconfigureerd als omgevingsvariabelen zoals hieronder wordt aangegeven.
roleArn
roleSessionName
, roleExternalID
kan ook worden gebruikt voor verificatie met AWS met behulp van exemplaarprofielen.
Als geen van deze is opgegeven, wordt de standaardKETEN van de AWS-provider gebruikt.
Parameter | Type | Description |
---|---|---|
streamName |
STRING |
Vereiste, door komma's gescheiden lijst van een of meer kinesisstromen. |
awsAccessKey |
STRING |
De AWS-toegangssleutel, indien van toepassing. Kan ook worden opgegeven via de verschillende opties die worden ondersteund via de standaardketen van de AWS-referentieprovider, inclusief omgevingsvariabelen (AWS_ACCESS_KEY_ID ) en een bestand met referentieprofielen. |
awsSecretKey |
STRING |
De geheime sleutel die overeenkomt met de toegangssleutel. Kan worden opgegeven in de argumenten of via de verschillende opties die worden ondersteund via de standaardketen van de AWS-referentieprovider, inclusief omgevingsvariabelen (AWS_SECRET_KEY of AWS_SECRET_ACCESS_KEY ) en een bestand met referentiesprofielen. |
roleArn |
STRING |
Amazon-resourcenaam van de rol die moet worden aangenomen bij het openen van Kinesis. |
roleExternalId |
STRING |
Wordt gebruikt bij het delegeren van toegang tot het AWS-account. |
roleSessionName |
STRING |
Naam van AWS-rolsessie. |
stsEndpoint |
STRING |
Een eindpunt voor het aanvragen van referenties voor tijdelijke toegang. |
region |
STRING |
Regio voor de streams die moeten worden opgegeven. De standaardwaarde is de lokaal opgeloste regio. |
endpoint |
STRING |
regionaal eindpunt voor Kinesis-gegevensstromen. De standaardwaarde is de lokaal opgeloste regio. |
initialPosition |
STRING |
Beginpositie voor lezen vanuit de stroom. Een van de volgende: 'latest' (standaard), 'trim_horizon', 'earliest', 'at_timestamp'. |
consumerMode |
STRING |
Een van: 'polling' (standaard) of 'EFO' (enhanced-fan-out). |
consumerName |
STRING |
De naam van de consument. Alle consumenten worden voorafgegaan door 'databricks_'. De standaardwaarde is een lege tekenreeks. |
registerConsumerTimeoutInterval |
STRING |
de maximale time-out om te wachten totdat de Kinesis EFO-consument is geregistreerd bij de Kinesis-stroom voordat er een fout wordt gegenereerd. De standaardwaarde is '300s'. |
requireConsumerDeregistration |
BOOLEAN |
true om de EFO-consument bij het beëindigen van query's ongedaan te maken. Standaard is false . |
deregisterConsumerTimeoutInterval |
STRING |
De maximale time-out om te wachten totdat de Kinesis EFO-consument wordt gederegistereerd met de Kinesis-stream voordat er een fout wordt gegenereerd. De standaardwaarde is '300s'. |
consumerRefreshInterval |
STRING |
Het interval waarmee de consument wordt gecontroleerd en vernieuwd. De standaardwaarde is '300s'. |
De volgende argumenten worden gebruikt voor het beheren van de leesdoorvoer en latentie voor Kinesis:
Parameter | Type | Description |
---|---|---|
maxRecordsPerFetch |
INTEGER (>0) |
Optioneel, met een standaardwaarde van 10.000 records die per API-aanvraag naar Kinesis moeten worden gelezen. |
maxFetchRate |
STRING |
Hoe snel u gegevens per shard kunt vooraf fetcheren. Een waarde tussen '1,0' en '2.0' die wordt gemeten in MB/s. De standaardwaarde is '1.0'. |
minFetchPeriod |
STRING |
De maximale wachttijd tussen opeenvolgende voorfetchpogingen. De standaardwaarde is 400 ms. |
maxFetchDuration |
STRING |
De maximale duur voor het bufferen van nieuwe gegevens. De standaardwaarde is '10s'. |
fetchBufferSize |
STRING |
De hoeveelheid gegevens voor de volgende trigger. De standaardwaarde is '20 gb'. |
shardsPerTask |
INTEGER (>0) |
Het aantal Kinesis-shards dat parallel per spark-taak vooraf moet worden uitgevoerd. De standaard is 5. |
shardFetchinterval |
STRING |
Hoe vaak moet er worden gepeild naar resharding. De standaardwaarde is '1s'. |
coalesceThresholdBlockSize |
INTEGER (>0) |
De drempel waarop automatische samenvoeging plaatsvindt. De standaardwaarde is 10.000.000. |
coalesce |
BOOLEAN |
true om vooraf gemaakte aanvragen samen te voegen. De standaardwaarde is true . |
coalesceBinSize |
INTEGER (>0) |
De geschatte blokgrootte na het samenvoegen. De standaardwaarde is 128.000.000. |
reuseKinesisClient |
BOOLEAN |
true om de Kinesis-client die in de cache is opgeslagen, opnieuw te gebruiken. De standaardwaarde is true behalve op een PE-cluster. |
clientRetries |
INTEGER (>0) |
Het aantal nieuwe pogingen in het scenario voor opnieuw proberen. De standaard is 5. |
Retouren
Een tabel met Kinesis-records met het volgende schema:
Naam | Gegevenstype | Null-waarde toegestaan | Standaard | Beschrijving |
---|---|---|---|---|
partitionKey |
STRING |
Nee | Een sleutel die wordt gebruikt voor het distribueren van gegevens tussen de shards van een stream. Alle gegevensrecords met dezelfde partitiesleutel worden gelezen uit dezelfde shard. | |
data |
BINARY |
Nee | De nettolading van de kinesis-gegevens, base-64 gecodeerd. | |
stream |
STRING |
Nee | De naam van de stream waaruit de gegevens zijn gelezen. | |
shardId |
STRING |
Nee | Een unieke id voor de shard waaruit de gegevens zijn gelezen. | |
sequenceNumber |
BIGINT |
Nee | De unieke id van de record in de shard. | |
approximateArrivalTimestamp |
TIMESTAMP |
Nee | De geschatte tijd dat de record in de stream is ingevoegd. |
De kolommen (stream, shardId, sequenceNumber)
vormen een primaire sleutel.
Voorbeelden
-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
awsAccessKey => secret(‘test-databricks’, ‘awsAccessKey’),
awsSecretKey => secret(‘test-databricks’, ‘awsSecretKey’),
initialPosition => 'earliest');
-- The data would now need to be queried from the testing.streaming_table
-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest');
-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest',
roleArn => 'arn:aws:iam::123456789012:role/MyRole',
roleSessionName => 'testing@databricks.com');