Compartir a través de


Función con valores de tabla de transmisión por secuencias read_kinesis

Se aplica a: casilla marcada como sí Databricks SQL casilla marcada como sí Databricks Runtime 13.3 LTS y versiones posteriores

Devuelve una tabla con registros leídos de Kinesis de una o varias secuencias.

Sintaxis

read_kinesis ( { parameter => value } [, ...] )

Argumentos

read_kinesis requiere una invocación de parámetros con nombre.

El único argumento necesario es streamName. Todos los demás argumentos son opcionales.

Las descripciones de los argumentos son breves aquí. Para obtener más información, vea la documentación de Amazon Kinesis.

Hay varias opciones de conexión para conectarse y autenticarse con AWS. awsAccessKey y awsSecretKey se pueden especificar en los argumentos de función mediante la función secret, establecer manualmente en los argumentos o configurarse como variables de entorno, tal y como se indica a continuación. roleArn, roleExternalID y roleSessionName también se pueden usar para autenticarse con AWS mediante perfiles de instancia. Si no se especifica ninguno de ellos, se usará la cadena de proveedores de AWS predeterminada.

Parámetro Tipo Descripción
streamName STRING Obligatorio: lista separada por comas de una o varias secuencias de Kinesis.
awsAccessKey STRING La clave de acceso de AWS, si existe. También se puede especificar a través de las distintas opciones admitidas a través de la cadena del proveedor de credenciales predeterminada de AWS, incluyendo las variables de entorno (AWS_ACCESS_KEY_ID) y un archivo de perfiles de credenciales.
awsSecretKey STRING Clave secreta que corresponde a la clave de acceso. Se puede especificar en los argumentos o a través de las distintas opciones admitidas a través de la cadena de proveedores de credenciales predeterminada de AWS, incluyendo las variables de entorno (AWS_SECRET_KEY o AWS_SECRET_ACCESS_KEY) y un archivo de perfiles de credenciales.
roleArn STRING Nombre de recurso de Amazon del rol que se va a asumir al acceder a Kinesis.
roleExternalId STRING Se usa al delegar el acceso a la cuenta de AWS.
roleSessionName STRING Nombre de la sesión de rol de AWS.
stsEndpoint STRING Punto de conexión para solicitar credenciales de acceso temporal.
region STRING Región de las secuencias que se van a especificar. El valor predeterminado es la región resuelta localmente.
endpoint STRING punto de conexión regional para flujos de datos de Kinesis. El valor predeterminado es la región resuelta localmente.
initialPosition STRING Posición inicial para leer desde en la secuencia. Uno de: "latest" (valor predeterminado), "trim_horizon", "earliest" y "at_timestamp".
consumerMode STRING Uno de: "polling" (valor predeterminado) o "EFO" (distribución ramificada mejorada).
consumerName STRING Nombre del consumidor. Todos los consumidores tienen el prefijo "databricks_". El valor predeterminado es una cadena vacía.
registerConsumerTimeoutInterval STRING tiempo de espera máximo para esperar a que el consumidor de Kinesis EFO se registre con la secuencia Kinesis antes de producir un error. El valor predeterminado es "300 s".
requireConsumerDeregistration BOOLEAN true para anular el registro del consumidor de EFO al finalizar la consulta. El valor predeterminado es false.
deregisterConsumerTimeoutInterval STRING Tiempo de espera máximo para esperar a que el consumidor de Kinesis EFO anule el registro con la secuencia Kinesis antes de producir un error. El valor predeterminado es "300 s".
consumerRefreshInterval STRING Intervalo en el que se comprueba y actualiza el consumidor. El valor predeterminado es "300 s".

Los argumentos siguientes se usan para controlar el rendimiento de lectura y la latencia de Kinesis:

Parámetro Tipo Descripción
maxRecordsPerFetch INTEGER (>0) Opcional, con el valor predeterminado de 10,000, los registros que se van a leer por solicitud de API a Kinesis.
maxFetchRate STRING El valor de esta opción representa la velocidad de captura previa de datos de cada partición. Valor entre "1,0" y "2,0" medido en MB/s. El valor predeterminado es "1,0".
minFetchPeriod STRING Tiempo de espera máximo entre intentos de captura previa consecutivos. El valor predeterminado es "400 ms".
maxFetchDuration STRING Duración máxima para almacenar datos nuevos capturados previamente. El valor predeterminado es "10 s".
fetchBufferSize STRING Cantidad de datos para el siguiente desencadenador. El valor predeterminado es "20 gb".
shardsPerTask INTEGER (>0) El número de particiones de Kinesis que se van a capturar previamente en paralelo por tarea de Spark. El valor predeterminado es 5.
shardFetchinterval STRING Frecuencia con la que sondear el particionamiento. El valor predeterminado es "1 s".
coalesceThresholdBlockSize INTEGER (>0) Umbral en el que se produce la fusión automática. El valor predeterminado es 10 000 000.
coalesce BOOLEAN true para fusionar solicitudes previamente capturadas. El valor predeterminado es true.
coalesceBinSize INTEGER (>0) Tamaño aproximado del bloque después de la fusión. El valor predeterminado es 128 000 000.
reuseKinesisClient BOOLEAN true para reutilizar el cliente de Kinesis almacenado en la memoria caché. El valor predeterminado es true, excepto en un clúster de PE.
clientRetries INTEGER (>0) Número de reintentos en el escenario de reintento. El valor predeterminado es 5.

Devoluciones

Tabla de registros de Kinesis con el esquema siguiente:

Nombre Tipo de datos Nullable Estándar Descripción
partitionKey STRING No Clave que se usa para distribuir datos entre las particiones de una secuencia. Todos los registros de datos con la misma clave de partición se leerán desde la misma partición.
data BINARY No Carga de datos de kinesis, codificada en base-64.
stream STRING No Nombre de la secuencia desde la que se leyeron los datos.
shardId STRING No Identificador único de la partición desde la que se leyeron los datos.
sequenceNumber BIGINT No Identificador único del registro dentro de su partición.
approximateArrivalTimestamp TIMESTAMP No La hora aproximada en la que se insertó el registro en la secuencia.

Las columnas (stream, shardId, sequenceNumber) constituyen una clave principal.

Ejemplos

-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        awsAccessKey => secret(‘test-databricks’, ‘awsAccessKey’),
        awsSecretKey => secret(‘test-databricks’, ‘awsSecretKey’),
        initialPosition => 'earliest');

-- The data would now need to be queried from the testing.streaming_table

-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        initialPosition => 'earliest');

-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        initialPosition => 'earliest',
        roleArn => 'arn:aws:iam::123456789012:role/MyRole',
        roleSessionName => 'testing@databricks.com');