read_kinesis
-Streamingtabellenwertfunktionen
Gilt für: Databricks SQL Databricks Runtime 13.3 LTS und höher
Gibt eine Tabelle mit Datensätzen zurück, die aus Kinesis aus einem oder mehreren Streams gelesen werden.
Syntax
read_kinesis ( { parameter => value } [, ...] )
Argumente
read_kinesis
erfordert einen benannten Parameteraufruf.
Das einzige erforderliche Argument ist streamName
. Alle anderen Argumente sind optional.
Hier werden die Argumente nur kurz beschrieben. Weitere Details finden Sie in der Dokumentation zu Amazon Kinesis.
Es gibt verschiedene Optionen, um eine Verbindung mit AWS herzustellen und sich bei AWS zu authentifizieren.
awsAccessKey
und awsSecretKey
können entweder mit der Funktion „secret“ in den Funktionsargumenten angegeben, manuell in den Argumenten festgelegt oder wie unten angegeben als Umgebungsvariablen konfiguriert werden.
roleArn
, roleExternalID
und roleSessionName
können auch verwendet werden, um sich mithilfe von Instanzprofilen bei AWS zu authentifizieren.
Wenn keine dieser Angaben vorhanden ist, wird die AWS-Standardanbieterkette verwendet.
Parameter | Typ | Beschreibung |
---|---|---|
streamName |
STRING |
Erforderliche, durch Kommas getrennte Liste mit mindestens einem Kinesis-Stream. |
awsAccessKey |
STRING |
Der AWS-Zugriffsschlüssel, falls vorhanden. Kann auch über die verschiedenen Optionen angegeben werden, die über die AWS-Standardanbieterkette für Anmeldeinformationen unterstützt werden, beispielsweise über Umgebungsvariablen (AWS_ACCESS_KEY_ID ) oder eine Datei mit Anmeldeinformationsprofilen. |
awsSecretKey |
STRING |
Der geheime Schlüssel, der dem Zugriffsschlüssel entspricht. Kann in den Argumenten oder über die verschiedenen Optionen angegeben werden, die über die AWS-Standardanbieterkette für Anmeldeinformationen unterstützt werden, beispielsweise über Umgebungsvariablen (AWS_SECRET_KEY oder AWS_SECRET_ACCESS_KEY ) oder eine Datei mit Anmeldeinformationsprofilen. |
roleArn |
STRING |
Der Amazon-Ressourcenname der Rolle, die beim Zugriff auf Kinesis angenommen werden soll. |
roleExternalId |
STRING |
Wird beim Delegieren des Zugriffs auf das AWS-Konto verwendet. |
roleSessionName |
STRING |
Name der AWS-Rollensitzung. |
stsEndpoint |
STRING |
Ein Endpunkt zum Anfordern temporärer Anmeldeinformationen für den Zugriff. |
region |
STRING |
Bereich für die anzugebenden Streams. Standardwert: die lokal aufgelöste Region. |
endpoint |
STRING |
Regionaler Endpunkt für Kinesis-Datenströme. Standardwert: die lokal aufgelöste Region. |
initialPosition |
STRING |
Startposition für das Lesen aus dem Datenstrom. Einer dieser Werte: „latest“ (Standard), „trim_horizon“, „earliest“, „at_timestamp“. |
consumerMode |
STRING |
Einer dieser Werte: „polling“ (Standard) oder „EFO“ (enhanced-fan-out). |
consumerName |
STRING |
Der Name des Consumers. Allen Consumern ist „databricks_“ vorangestellt. Der Standardwert ist eine leere Zeichenfolge. |
registerConsumerTimeoutInterval |
STRING |
Der maximale Timeoutwert für die Zeitspanne, die abgewartet wird, bis der Kinesis-EFO-Consumer beim Kinesis-Datenstrom registriert wurde, bevor ein Fehler ausgelöst wird. Standardwert: 300 s. |
requireConsumerDeregistration |
BOOLEAN |
true zum Aufheben der Registrierung des EFO-Consumers bei Beendigung der Abfrage. Der Standardwert ist false . |
deregisterConsumerTimeoutInterval |
STRING |
Der maximale Timeoutwert für die Zeitspanne, die abgewartet wird, bis die Registrierung des Kinesis-EFO-Consumer beim Kinesis-Datenstrom aufgehoben wurde, bevor ein Fehler ausgelöst wird. Standardwert: 300 s. |
consumerRefreshInterval |
STRING |
Das Intervall, in dem der Consumer überprüft und aktualisiert wird. Standardwert: 300 s. |
Die folgenden Argumente werden verwendet, um den Lesedurchsatz und die Leselatenz für Kinesis zu steuern:
Parameter | Typ | Beschreibung |
---|---|---|
maxRecordsPerFetch |
INTEGER (>0) |
Optional, mit dem Standardwert von 10 000 Datensätzen, die pro API-Anforderung an Kinesis gelesen werden sollen. |
maxFetchRate |
STRING |
Wie schnell Daten pro Shard vorab abgerufen werden. Ein Wert zwischen „1,0“ und „2,0“, der in MB/s gemessen wird. Standardwert: 1,0. |
minFetchPeriod |
STRING |
Die maximale Wartezeit zwischen aufeinanderfolgenden Versuchen eines Vorababrufs. Standardwert: 400 ms. |
maxFetchDuration |
STRING |
Die maximale Dauer zum Puffern vorab abgerufener neuer Daten. Standardwert: 10 s. |
fetchBufferSize |
STRING |
Die Datenmenge für den nächsten Trigger. Standardwert: 20 GB. |
shardsPerTask |
INTEGER (>0) |
Die Anzahl von Kinesis-Shards, die pro Spark-Aufgabe parallel vorab abgerufen werden sollen. Der Standard ist 5. |
shardFetchinterval |
STRING |
Anzahl der Pollingvorgänge zum Resharding. Standardwert: 1 s. |
coalesceThresholdBlockSize |
INTEGER (>0) |
Der Schwellenwert, bei dem eine automatische Zusammenfügung erfolgt. Standardwert: 10.000.000. |
coalesce |
BOOLEAN |
true zum Zusammenfügen vorab abgerufener Anforderungen. Der Standardwert ist true . |
coalesceBinSize |
INTEGER (>0) |
Die ungefähre Blockgröße nach dem Zusammenfügen. Standardwert: 128.000.000. |
reuseKinesisClient |
BOOLEAN |
true zum Wiederverwenden des im Cache gespeicherten Kinesis-Clients. Standardwert: true , außer für einen PE-Cluster. |
clientRetries |
INTEGER (>0) |
Die Anzahl der Wiederholungsversuche im Wiederholungsszenario. Der Standard ist 5. |
Gibt zurück
Eine Tabelle mit Kinesis-Datensätzen mit dem folgenden Schema:
Name | Datentyp | Nullable | Standard | BESCHREIBUNG |
---|---|---|---|---|
partitionKey |
STRING |
Nein | Ein Schlüssel, der zum Verteilen von Daten auf die Shards eines Datenstroms verwendet wird. Alle Datensätze mit demselben Partitionsschlüssel werden aus demselben Shard gelesen. | |
data |
BINARY |
Nein | Die Kinesis-Datenpayload, Base64-codiert. | |
stream |
STRING |
Nein | Der Name des Datenstroms, aus dem die Daten gelesen wurden. | |
shardId |
STRING |
Nein | Ein eindeutiger Bezeichner für den Shard, aus dem die Daten gelesen wurden. | |
sequenceNumber |
BIGINT |
Nein | Der eindeutige Bezeichner des Datensatzes innerhalb des Shards. | |
approximateArrivalTimestamp |
TIMESTAMP |
Nein | Die ungefähre Zeit, zu der der Datensatz in den Datenstrom eingefügt wurde. |
Die Spalten (stream, shardId, sequenceNumber)
stellen einen Primärschlüssel dar.
Beispiele
-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
awsAccessKey => secret(‘test-databricks’, ‘awsAccessKey’),
awsSecretKey => secret(‘test-databricks’, ‘awsSecretKey’),
initialPosition => 'earliest');
-- The data would now need to be queried from the testing.streaming_table
-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest');
-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest',
roleArn => 'arn:aws:iam::123456789012:role/MyRole',
roleSessionName => 'testing@databricks.com');