Partager via


Fonctions table de diffusion en continu read_kinesis

S’applique à : coche marquée oui Databricks SQL oui coché Databricks Runtime 13.3 LTS et ultérieur

Retourne une table avec des enregistrements lus à partir de Kinesis d’un ou plusieurs flux.

Syntaxe

read_kinesis ( { parameter => value } [, ...] )

Arguments

read_kinesis nécessite un appel de paramètre nommé.

Le seul argument obligatoire est streamName. Tous les autres arguments sont facultatifs.

Les descriptions des arguments sont brèves ici. Pour obtenir des informations plus détaillées, consultez la documentation Amazon Kinesis.

Il existe différentes options de connexion pour se connecter et s’authentifier auprès d’AWS. awsAccessKey et awsSecretKey peuvent être spécifiés dans les arguments de fonction en utilisant la fonction secret, définis manuellement dans les arguments ou configurés en tant que variables d’environnement, comme indiqué ci-dessous. roleArn, roleExternalID, roleSessionName peuvent également être utilisés pour s’authentifier auprès d’AWS en utilisant des profils d’instance. Si aucun de ces éléments n’est spécifié, il utilise la chaîne de fournisseur AWS par défaut.

Paramètre Type Description
streamName STRING Obligatoire, liste séparée par des virgules d’un ou plusieurs flux Kinesis.
awsAccessKey STRING Clé d’accès AWS, le cas échéant. Peut également être spécifiée via les différentes options prises en charge par le biais de la chaîne de fournisseurs d’informations d’identification par défaut AWS, notamment les variables d’environnement (AWS_ACCESS_KEY_ID) et un fichier de profils d’informations d’identification.
awsSecretKey STRING Clé secrète qui correspond à la clé d’accès. Peut être spécifiée dans l’argument ou via les différentes options prises en charge par le biais de la chaîne de fournisseurs d’informations d’identification par défaut AWS, notamment les variables d’environnement (AWS_SECRET_KEY ou AWS_SECRET_ACCESS_KEY) et un fichier de profils d’informations d’identification.
roleArn STRING Nom de ressource Amazon du rôle à assumer lors de l’accès à Kinesis.
roleExternalId STRING Utilisé lors de la délégation de l’accès au compte AWS.
roleSessionName STRING Nom de session de rôle AWS.
stsEndpoint STRING Point de terminaison pour demander des informations d’identification d’accès temporaire.
region STRING Région pour que les flux à spécifier. La valeur par défaut est la région résolue localement.
endpoint STRING Point de terminaison régional pour des flux de données Kinesis. La valeur par défaut est la région résolue localement.
initialPosition STRING Position de départ pour la lecture à partir du flux. L’un des éléments suivants : « le dernier » (valeur par défaut), « trim_horizon », « le plus récent », « at_timestamp ».
consumerMode STRING L’un des éléments suivants : « interrogation » (par défaut) ou « EFO » (enhanced-fan-out).
consumerName STRING Nom du consommateur. Tous les consommateurs sont précédés du préfixe « databricks_ ». La valeur par défaut est une chaîne vide.
registerConsumerTimeoutInterval STRING délai d’attente maximal pour attendre que le consommateur Kinesis EFO soit inscrit auprès du flux Kinesis avant de générer une erreur. La valeur par défaut est « 300s ».
requireConsumerDeregistration BOOLEAN true pour désinscrire le consommateur EFO lors de l’arrêt de la requête. La valeur par défaut est false.
deregisterConsumerTimeoutInterval STRING Délai d’attente maximal pour attendre que le consommateur Kinesis EFO soit désinscrit auprès du flux Kinesis avant de générer une erreur. La valeur par défaut est « 300s ».
consumerRefreshInterval STRING Intervalle auquel le consommateur est vérifié et actualisé. La valeur par défaut est « 300s ».

Les arguments suivants sont utilisés pour contrôler le débit de lecture et la latence pour Kinesis :

Paramètre Type Description
maxRecordsPerFetch INTEGER (>0) Facultatif, avec la valeur par défaut de 10 000, enregistrements à lire par requête d’API à Kinesis.
maxFetchRate STRING Vitesse de pré-récupération des données par partition. Une valeur comprise entre « 1,0 » et « 2,0 », mesurée en Mo/s. La valeur par défaut est « 1.0 ».
minFetchPeriod STRING Temps d’attente maximal entre des tentatives de préversion consécutives. La valeur par défaut est « 400ms ».
maxFetchDuration STRING La durée maximale de mise en mémoire tampon de nouvelles données prérécupérées. La valeur par défaut est « 10s ».
fetchBufferSize STRING Quantité de données pour le déclencheur suivant. La valeur par défaut est « 20gb ».
shardsPerTask INTEGER (>0) Nombre de partitions Kinesis à prérécupérer en parallèle par tâche Spark. La valeur par défaut est 5.
shardFetchinterval STRING Fréquence d’interrogation pour un repartitionnement. La valeur par défaut est « 1s ».
coalesceThresholdBlockSize INTEGER (>0) Seuil auquel se produit la fusion automatique. La valeur par défaut est 10,000,000.
coalesce BOOLEAN true pour fusionner des requêtes prérécupérées. Par défaut, il s’agit de true.
coalesceBinSize INTEGER (>0) Taille approximative du bloc après la fusion. La valeur par défaut est 128,000,000.
reuseKinesisClient BOOLEAN true pour réutiliser le client Kinesis stocké dans le cache. La valeur par défaut est true sauf sur un cluster PE.
clientRetries INTEGER (>0) Nombre de nouvelles tentatives dans le scénario de nouvelle tentative. La valeur par défaut est 5.

Retours

Table d’enregistrements Kinesis avec le schéma suivant :

Name Type de données Nullable Standard Description
partitionKey STRING Non Clé utilisée pour distribuer des données entre les partitions d’un flux. Tous les enregistrements de données avec la même clé de partition sont lus à partir de la même partition.
data BINARY Non Charge utile de données Kinesis, encodée en base 64.
stream STRING Non Nom du flux à partir duquel les données ont été lues.
shardId STRING Non Identificateur unique de la partition à partir de laquelle les données ont été lues.
sequenceNumber BIGINT Non Identificateur unique de l’enregistrement dans sa partition.
approximateArrivalTimestamp TIMESTAMP Non Heure approximative à laquelle l’enregistrement a été inséré dans le flux.

Les colonnes (stream, shardId, sequenceNumber) constituent une clé primaire.

Exemples

-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        awsAccessKey => secret(‘test-databricks’, ‘awsAccessKey’),
        awsSecretKey => secret(‘test-databricks’, ‘awsSecretKey’),
        initialPosition => 'earliest');

-- The data would now need to be queried from the testing.streaming_table

-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        initialPosition => 'earliest');

-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        initialPosition => 'earliest',
        roleArn => 'arn:aws:iam::123456789012:role/MyRole',
        roleSessionName => 'testing@databricks.com');