Fonctions table de diffusion en continu read_kinesis
S’applique à : Databricks SQL Databricks Runtime 13.3 LTS et ultérieur
Retourne une table avec des enregistrements lus à partir de Kinesis d’un ou plusieurs flux.
Syntaxe
read_kinesis ( { parameter => value } [, ...] )
Arguments
read_kinesis
nécessite un appel de paramètre nommé.
Le seul argument obligatoire est streamName
. Tous les autres arguments sont facultatifs.
Les descriptions des arguments sont brèves ici. Pour obtenir des informations plus détaillées, consultez la documentation Amazon Kinesis.
Il existe différentes options de connexion pour se connecter et s’authentifier auprès d’AWS.
awsAccessKey
et awsSecretKey
peuvent être spécifiés dans les arguments de fonction en utilisant la fonction secret, définis manuellement dans les arguments ou configurés en tant que variables d’environnement, comme indiqué ci-dessous.
roleArn
, roleExternalID
, roleSessionName
peuvent également être utilisés pour s’authentifier auprès d’AWS en utilisant des profils d’instance.
Si aucun de ces éléments n’est spécifié, il utilise la chaîne de fournisseur AWS par défaut.
Paramètre | Type | Description |
---|---|---|
streamName |
STRING |
Obligatoire, liste séparée par des virgules d’un ou plusieurs flux Kinesis. |
awsAccessKey |
STRING |
Clé d’accès AWS, le cas échéant. Peut également être spécifiée via les différentes options prises en charge par le biais de la chaîne de fournisseurs d’informations d’identification par défaut AWS, notamment les variables d’environnement (AWS_ACCESS_KEY_ID ) et un fichier de profils d’informations d’identification. |
awsSecretKey |
STRING |
Clé secrète qui correspond à la clé d’accès. Peut être spécifiée dans l’argument ou via les différentes options prises en charge par le biais de la chaîne de fournisseurs d’informations d’identification par défaut AWS, notamment les variables d’environnement (AWS_SECRET_KEY ou AWS_SECRET_ACCESS_KEY ) et un fichier de profils d’informations d’identification. |
roleArn |
STRING |
Nom de ressource Amazon du rôle à assumer lors de l’accès à Kinesis. |
roleExternalId |
STRING |
Utilisé lors de la délégation de l’accès au compte AWS. |
roleSessionName |
STRING |
Nom de session de rôle AWS. |
stsEndpoint |
STRING |
Point de terminaison pour demander des informations d’identification d’accès temporaire. |
region |
STRING |
Région pour que les flux à spécifier. La valeur par défaut est la région résolue localement. |
endpoint |
STRING |
Point de terminaison régional pour des flux de données Kinesis. La valeur par défaut est la région résolue localement. |
initialPosition |
STRING |
Position de départ pour la lecture à partir du flux. L’un des éléments suivants : « le dernier » (valeur par défaut), « trim_horizon », « le plus récent », « at_timestamp ». |
consumerMode |
STRING |
L’un des éléments suivants : « interrogation » (par défaut) ou « EFO » (enhanced-fan-out). |
consumerName |
STRING |
Nom du consommateur. Tous les consommateurs sont précédés du préfixe « databricks_ ». La valeur par défaut est une chaîne vide. |
registerConsumerTimeoutInterval |
STRING |
délai d’attente maximal pour attendre que le consommateur Kinesis EFO soit inscrit auprès du flux Kinesis avant de générer une erreur. La valeur par défaut est « 300s ». |
requireConsumerDeregistration |
BOOLEAN |
true pour désinscrire le consommateur EFO lors de l’arrêt de la requête. La valeur par défaut est false . |
deregisterConsumerTimeoutInterval |
STRING |
Délai d’attente maximal pour attendre que le consommateur Kinesis EFO soit désinscrit auprès du flux Kinesis avant de générer une erreur. La valeur par défaut est « 300s ». |
consumerRefreshInterval |
STRING |
Intervalle auquel le consommateur est vérifié et actualisé. La valeur par défaut est « 300s ». |
Les arguments suivants sont utilisés pour contrôler le débit de lecture et la latence pour Kinesis :
Paramètre | Type | Description |
---|---|---|
maxRecordsPerFetch |
INTEGER (>0) |
Facultatif, avec la valeur par défaut de 10 000, enregistrements à lire par requête d’API à Kinesis. |
maxFetchRate |
STRING |
Vitesse de pré-récupération des données par partition. Une valeur comprise entre « 1,0 » et « 2,0 », mesurée en Mo/s. La valeur par défaut est « 1.0 ». |
minFetchPeriod |
STRING |
Temps d’attente maximal entre des tentatives de préversion consécutives. La valeur par défaut est « 400ms ». |
maxFetchDuration |
STRING |
La durée maximale de mise en mémoire tampon de nouvelles données prérécupérées. La valeur par défaut est « 10s ». |
fetchBufferSize |
STRING |
Quantité de données pour le déclencheur suivant. La valeur par défaut est « 20gb ». |
shardsPerTask |
INTEGER (>0) |
Nombre de partitions Kinesis à prérécupérer en parallèle par tâche Spark. La valeur par défaut est 5. |
shardFetchinterval |
STRING |
Fréquence d’interrogation pour un repartitionnement. La valeur par défaut est « 1s ». |
coalesceThresholdBlockSize |
INTEGER (>0) |
Seuil auquel se produit la fusion automatique. La valeur par défaut est 10,000,000. |
coalesce |
BOOLEAN |
true pour fusionner des requêtes prérécupérées. Par défaut, il s’agit de true . |
coalesceBinSize |
INTEGER (>0) |
Taille approximative du bloc après la fusion. La valeur par défaut est 128,000,000. |
reuseKinesisClient |
BOOLEAN |
true pour réutiliser le client Kinesis stocké dans le cache. La valeur par défaut est true sauf sur un cluster PE. |
clientRetries |
INTEGER (>0) |
Nombre de nouvelles tentatives dans le scénario de nouvelle tentative. La valeur par défaut est 5. |
Retours
Table d’enregistrements Kinesis avec le schéma suivant :
Name | Type de données | Nullable | Standard | Description |
---|---|---|---|---|
partitionKey |
STRING |
Non | Clé utilisée pour distribuer des données entre les partitions d’un flux. Tous les enregistrements de données avec la même clé de partition sont lus à partir de la même partition. | |
data |
BINARY |
Non | Charge utile de données Kinesis, encodée en base 64. | |
stream |
STRING |
Non | Nom du flux à partir duquel les données ont été lues. | |
shardId |
STRING |
Non | Identificateur unique de la partition à partir de laquelle les données ont été lues. | |
sequenceNumber |
BIGINT |
Non | Identificateur unique de l’enregistrement dans sa partition. | |
approximateArrivalTimestamp |
TIMESTAMP |
Non | Heure approximative à laquelle l’enregistrement a été inséré dans le flux. |
Les colonnes (stream, shardId, sequenceNumber)
constituent une clé primaire.
Exemples
-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
awsAccessKey => secret(‘test-databricks’, ‘awsAccessKey’),
awsSecretKey => secret(‘test-databricks’, ‘awsSecretKey’),
initialPosition => 'earliest');
-- The data would now need to be queried from the testing.streaming_table
-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest');
-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest',
roleArn => 'arn:aws:iam::123456789012:role/MyRole',
roleSessionName => 'testing@databricks.com');