Función con valores de tabla de transmisión por secuencias read_kinesis
Se aplica a: Databricks SQL Databricks Runtime 13.3 LTS y versiones posteriores
Devuelve una tabla con registros leídos de Kinesis de una o varias secuencias.
Sintaxis
read_kinesis ( { parameter => value } [, ...] )
Argumentos
read_kinesis
requiere una invocación de parámetros con nombre.
El único argumento necesario es streamName
. Todos los demás argumentos son opcionales.
Las descripciones de los argumentos son breves aquí. Para obtener más información, vea la documentación de Amazon Kinesis.
Hay varias opciones de conexión para conectarse y autenticarse con AWS.
awsAccessKey
y awsSecretKey
se pueden especificar en los argumentos de función mediante la función secret, establecer manualmente en los argumentos o configurarse como variables de entorno, tal y como se indica a continuación.
roleArn
, roleExternalID
y roleSessionName
también se pueden usar para autenticarse con AWS mediante perfiles de instancia.
Si no se especifica ninguno de ellos, se usará la cadena de proveedores de AWS predeterminada.
Parámetro | Tipo | Descripción |
---|---|---|
streamName |
STRING |
Obligatorio: lista separada por comas de una o varias secuencias de Kinesis. |
awsAccessKey |
STRING |
La clave de acceso de AWS, si existe. También se puede especificar a través de las distintas opciones admitidas a través de la cadena del proveedor de credenciales predeterminada de AWS, incluyendo las variables de entorno (AWS_ACCESS_KEY_ID ) y un archivo de perfiles de credenciales. |
awsSecretKey |
STRING |
Clave secreta que corresponde a la clave de acceso. Se puede especificar en los argumentos o a través de las distintas opciones admitidas a través de la cadena de proveedores de credenciales predeterminada de AWS, incluyendo las variables de entorno (AWS_SECRET_KEY o AWS_SECRET_ACCESS_KEY ) y un archivo de perfiles de credenciales. |
roleArn |
STRING |
Nombre de recurso de Amazon del rol que se va a asumir al acceder a Kinesis. |
roleExternalId |
STRING |
Se usa al delegar el acceso a la cuenta de AWS. |
roleSessionName |
STRING |
Nombre de la sesión de rol de AWS. |
stsEndpoint |
STRING |
Punto de conexión para solicitar credenciales de acceso temporal. |
region |
STRING |
Región de las secuencias que se van a especificar. El valor predeterminado es la región resuelta localmente. |
endpoint |
STRING |
punto de conexión regional para flujos de datos de Kinesis. El valor predeterminado es la región resuelta localmente. |
initialPosition |
STRING |
Posición inicial para leer desde en la secuencia. Uno de: "latest" (valor predeterminado), "trim_horizon", "earliest" y "at_timestamp". |
consumerMode |
STRING |
Uno de: "polling" (valor predeterminado) o "EFO" (distribución ramificada mejorada). |
consumerName |
STRING |
Nombre del consumidor. Todos los consumidores tienen el prefijo "databricks_". El valor predeterminado es una cadena vacía. |
registerConsumerTimeoutInterval |
STRING |
tiempo de espera máximo para esperar a que el consumidor de Kinesis EFO se registre con la secuencia Kinesis antes de producir un error. El valor predeterminado es "300 s". |
requireConsumerDeregistration |
BOOLEAN |
true para anular el registro del consumidor de EFO al finalizar la consulta. El valor predeterminado es false . |
deregisterConsumerTimeoutInterval |
STRING |
Tiempo de espera máximo para esperar a que el consumidor de Kinesis EFO anule el registro con la secuencia Kinesis antes de producir un error. El valor predeterminado es "300 s". |
consumerRefreshInterval |
STRING |
Intervalo en el que se comprueba y actualiza el consumidor. El valor predeterminado es "300 s". |
Los argumentos siguientes se usan para controlar el rendimiento de lectura y la latencia de Kinesis:
Parámetro | Tipo | Descripción |
---|---|---|
maxRecordsPerFetch |
INTEGER (>0) |
Opcional, con el valor predeterminado de 10,000, los registros que se van a leer por solicitud de API a Kinesis. |
maxFetchRate |
STRING |
El valor de esta opción representa la velocidad de captura previa de datos de cada partición. Valor entre "1,0" y "2,0" medido en MB/s. El valor predeterminado es "1,0". |
minFetchPeriod |
STRING |
Tiempo de espera máximo entre intentos de captura previa consecutivos. El valor predeterminado es "400 ms". |
maxFetchDuration |
STRING |
Duración máxima para almacenar datos nuevos capturados previamente. El valor predeterminado es "10 s". |
fetchBufferSize |
STRING |
Cantidad de datos para el siguiente desencadenador. El valor predeterminado es "20 gb". |
shardsPerTask |
INTEGER (>0) |
El número de particiones de Kinesis que se van a capturar previamente en paralelo por tarea de Spark. El valor predeterminado es 5. |
shardFetchinterval |
STRING |
Frecuencia con la que sondear el particionamiento. El valor predeterminado es "1 s". |
coalesceThresholdBlockSize |
INTEGER (>0) |
Umbral en el que se produce la fusión automática. El valor predeterminado es 10 000 000. |
coalesce |
BOOLEAN |
true para fusionar solicitudes previamente capturadas. El valor predeterminado es true . |
coalesceBinSize |
INTEGER (>0) |
Tamaño aproximado del bloque después de la fusión. El valor predeterminado es 128 000 000. |
reuseKinesisClient |
BOOLEAN |
true para reutilizar el cliente de Kinesis almacenado en la memoria caché. El valor predeterminado es true , excepto en un clúster de PE. |
clientRetries |
INTEGER (>0) |
Número de reintentos en el escenario de reintento. El valor predeterminado es 5. |
Devoluciones
Tabla de registros de Kinesis con el esquema siguiente:
Nombre | Tipo de datos | Nullable | Estándar | Descripción |
---|---|---|---|---|
partitionKey |
STRING |
No | Clave que se usa para distribuir datos entre las particiones de una secuencia. Todos los registros de datos con la misma clave de partición se leerán desde la misma partición. | |
data |
BINARY |
No | Carga de datos de kinesis, codificada en base-64. | |
stream |
STRING |
No | Nombre de la secuencia desde la que se leyeron los datos. | |
shardId |
STRING |
No | Identificador único de la partición desde la que se leyeron los datos. | |
sequenceNumber |
BIGINT |
No | Identificador único del registro dentro de su partición. | |
approximateArrivalTimestamp |
TIMESTAMP |
No | La hora aproximada en la que se insertó el registro en la secuencia. |
Las columnas (stream, shardId, sequenceNumber)
constituyen una clave principal.
Ejemplos
-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
awsAccessKey => secret(‘test-databricks’, ‘awsAccessKey’),
awsSecretKey => secret(‘test-databricks’, ‘awsSecretKey’),
initialPosition => 'earliest');
-- The data would now need to be queried from the testing.streaming_table
-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest');
-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest',
roleArn => 'arn:aws:iam::123456789012:role/MyRole',
roleSessionName => 'testing@databricks.com');