read_kinesis
Função com valor de tabela de streaming
Aplica-se a: Databricks SQL Databricks Runtime 13.3 LTS e superior
Retorna uma tabela com registros lidos do Kinesis de um ou mais fluxos.
Sintaxe
read_kinesis ( { parameter => value } [, ...] )
Argumentos
read_kinesis
requer invocação de parâmetro nomeado.
O único argumento necessário é streamName
. Todos os outros argumentos são opcionais.
As descrições dos argumentos são breves aqui. Para obter mais detalhes, consulte a documentação do Amazon Kinesis .
Há várias opções de conexão para se conectar e autenticar com a AWS.
awsAccessKey
, e awsSecretKey
pode ser especificado nos argumentos da função usando a função secreta, definida manualmente nos argumentos ou configurada como variáveis de ambiente, conforme indicado abaixo.
roleArn
, roleExternalID
, roleSessionName
também pode ser usado para autenticar com a AWS usando perfis de instância.
Se nenhum deles for especificado, ele usará a cadeia de provedores padrão da AWS.
Parâmetro | Tipo | Description |
---|---|---|
streamName |
STRING |
Lista obrigatória separada por vírgulas de um ou mais fluxos de cinese. |
awsAccessKey |
STRING |
A chave de acesso da AWS, se houver. Também pode ser especificado por meio das várias opções suportadas pela cadeia de provedores de credenciais padrão da AWS, incluindo variáveis de ambiente (AWS_ACCESS_KEY_ID ) e um arquivo de perfis de credenciais. |
awsSecretKey |
STRING |
A chave secreta que corresponde à chave de acesso. Pode ser especificado nos argumentos ou por meio das várias opções suportadas pela cadeia de provedores de credenciais padrão da AWS, incluindo variáveis de ambiente (AWS_SECRET_KEY ou AWS_SECRET_ACCESS_KEY ) e um arquivo de perfis de credenciais. |
roleArn |
STRING |
Nome do recurso da Amazon da função a ser assumida ao acessar o Kinesis. |
roleExternalId |
STRING |
Usado ao delegar acesso à conta da AWS. |
roleSessionName |
STRING |
Nome da sessão de função da AWS. |
stsEndpoint |
STRING |
Um ponto de extremidade para solicitar credenciais de acesso temporárias. |
region |
STRING |
Região para os fluxos a serem especificados. O padrão é a região resolvida localmente. |
endpoint |
STRING |
ponto de extremidade regional para fluxos de dados do Kinesis. O padrão é a região resolvida localmente. |
initialPosition |
STRING |
Posição inicial para leitura no fluxo. Um dos seguintes: «mais recente» (por defeito), «trim_horizon», «mais antigo», «at_timestamp». |
consumerMode |
STRING |
Um de: 'polling' (padrão), ou 'EFO' (enhanced-fan-out). |
consumerName |
STRING |
O nome do consumidor. Todos os consumidores são precedidos de «databricks_». O padrão é uma cadeia de caracteres vazia. |
registerConsumerTimeoutInterval |
STRING |
o tempo limite máximo para aguardar que o consumidor do Kinesis EFO seja registrado no fluxo do Kinesis antes de lançar um erro. O padrão é '300s'. |
requireConsumerDeregistration |
BOOLEAN |
true para cancelar o registro do consumidor EFO no encerramento da consulta. A predefinição é false . |
deregisterConsumerTimeoutInterval |
STRING |
O tempo limite máximo para aguardar que o consumidor do Kinesis EFO seja cancelado no fluxo do Kinesis antes de lançar um erro. O padrão é '300s'. |
consumerRefreshInterval |
STRING |
O intervalo em que o consumidor é verificado e atualizado. O padrão é '300s'. |
Os argumentos a seguir são usados para controlar a taxa de transferência de leitura e a latência do Kinesis:
Parâmetro | Tipo | Description |
---|---|---|
maxRecordsPerFetch |
INTEGER (>0) |
Opcional, com um padrão de 10.000 registros a serem lidos por solicitação de API para o Kinesis. |
maxFetchRate |
STRING |
Quão rápido para pré-buscar dados por fragmento. Um valor entre '1,0' e '2,0' medido em MB/s. O padrão é '1.0'. |
minFetchPeriod |
STRING |
O tempo máximo de espera entre tentativas consecutivas de pré-busca. O padrão é '400ms'. |
maxFetchDuration |
STRING |
A duração máxima para armazenar em buffer novos dados pré-obtidos. O padrão é '10s'. |
fetchBufferSize |
STRING |
A quantidade de dados para o próximo gatilho. O padrão é '20gb'. |
shardsPerTask |
INTEGER (>0) |
O número de fragmentos Kinesis a serem pré-buscados em paralelo por tarefa de faísca. A predefinição é 5. |
shardFetchinterval |
STRING |
Com que frequência sondar para reharding. O padrão é '1s'. |
coalesceThresholdBlockSize |
INTEGER (>0) |
O limiar a partir do qual ocorre a coalescência automática. O padrão é 10.000.000. |
coalesce |
BOOLEAN |
true para aglutinar pedidos pré-buscados. A predefinição é true . |
coalesceBinSize |
INTEGER (>0) |
O tamanho aproximado do bloco após a coalescência. O padrão é 128.000.000. |
reuseKinesisClient |
BOOLEAN |
true para reutilizar o cliente Kinesis armazenado no cache. O padrão é true exceto em um cluster PE. |
clientRetries |
INTEGER (>0) |
O número de novas tentativas no cenário de novas tentativas. A predefinição é 5. |
Devoluções
Uma tabela de registros do Kinesis com o seguinte esquema:
Name | Tipo de dados | Pode ser nulo | Standard | Description |
---|---|---|---|---|
partitionKey |
STRING |
Não | Uma chave que é usada para distribuir dados entre os fragmentos de um fluxo. Todos os registos de dados com a mesma chave de partição serão lidos a partir do mesmo fragmento. | |
data |
BINARY |
Não | A carga útil de dados kinesis, codificada em base-64. | |
stream |
STRING |
Não | O nome do fluxo de onde os dados foram lidos. | |
shardId |
STRING |
Não | Um identificador exclusivo para o fragmento de onde os dados foram lidos. | |
sequenceNumber |
BIGINT |
Não | O identificador exclusivo do registro em seu fragmento. | |
approximateArrivalTimestamp |
TIMESTAMP |
Não | A hora aproximada em que o registro foi inserido no fluxo. |
As colunas (stream, shardId, sequenceNumber)
constituem uma chave primária.
Exemplos
-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
awsAccessKey => secret(‘test-databricks’, ‘awsAccessKey’),
awsSecretKey => secret(‘test-databricks’, ‘awsSecretKey’),
initialPosition => 'earliest');
-- The data would now need to be queried from the testing.streaming_table
-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest');
-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest',
roleArn => 'arn:aws:iam::123456789012:role/MyRole',
roleSessionName => 'testing@databricks.com');