read_kinesis
Função de streaming com valor de tabela
Aplica-se a: Databricks SQL
Databricks Runtime 13.3 LTS ou posteriores
Retorna uma tabela com registros lidos do Kinesis de um ou mais fluxos.
Sintaxe
read_kinesis ( { parameter => value } [, ...] )
Argumentos
read_kinesis
requer invocação de parâmetro nomeado.
O único argumento necessário é streamName
. Todos os outros argumentos são opcionais.
As descrições dos argumentos são breves aqui. Para obter mais detalhes, consulte a documentação do Amazon Kinesis .
Há várias opções de conexão para se conectar e autenticar com a AWS.
awsAccessKey
, e awsSecretKey
pode ser especificado nos argumentos da função usando a função secreta, definida manualmente nos argumentos ou configurada como variáveis de ambiente, conforme indicado abaixo.
roleArn
, roleExternalID
, roleSessionName
também pode ser usado para autenticar com a AWS usando perfis de instância.
Se nenhum deles for especificado, ele usará a cadeia de provedores padrão da AWS.
Parâmetro | Tipo | Descrição |
---|---|---|
streamName |
STRING |
Obrigatório: lista separada por vírgulas de um ou mais streams do Kinesis. |
awsAccessKey |
STRING |
A chave de acesso da AWS, se houver. Também pode ser especificado por meio das várias opções suportadas pela cadeia de provedores de credenciais padrão da AWS, incluindo variáveis de ambiente (AWS_ACCESS_KEY_ID ) e um arquivo de perfis de credenciais. |
awsSecretKey |
STRING |
A chave secreta que corresponde à chave de acesso. Pode ser especificado nos argumentos ou por meio das várias opções suportadas pela cadeia de provedores de credenciais padrão da AWS, incluindo variáveis de ambiente (AWS_SECRET_KEY ou AWS_SECRET_ACCESS_KEY ) e um arquivo de perfis de credenciais. |
roleArn |
STRING |
Nome do recurso Amazon da função a assumir ao aceder ao Kinesis. |
roleExternalId |
STRING |
Usado ao delegar acesso à conta da AWS. |
roleSessionName |
STRING |
Nome da sessão de função da AWS. |
stsEndpoint |
STRING |
Um ponto de extremidade para solicitar credenciais de acesso temporárias. |
region |
STRING |
Região para os fluxos a serem especificados. O padrão é a região predeterminada localmente. |
endpoint |
STRING |
ponto de extremidade regional para os fluxos de dados do Kinesis. O padrão é a região resolvida localmente. |
initialPosition |
STRING |
Posição inicial para leitura no fluxo. Um dos seguintes: «mais recente» (por defeito), «trim_horizon», «mais antigo», «at_timestamp». |
consumerMode |
STRING |
Um de: 'polling' (padrão), ou 'EFO' (enhanced-fan-out). |
consumerName |
STRING |
O nome do consumidor. Todos os consumidores são prefixados com «databricks_». O padrão é uma cadeia de caracteres vazia. |
registerConsumerTimeoutInterval |
STRING |
o tempo limite máximo para aguardar que o consumidor do Kinesis EFO se registe no fluxo do Kinesis antes de lançar um erro. O padrão é '300s'. |
requireConsumerDeregistration |
BOOLEAN |
true para anular o registo do consumidor EFO no término da consulta. A predefinição é false . |
deregisterConsumerTimeoutInterval |
STRING |
O tempo limite máximo para aguardar que o consumidor do Kinesis EFO seja desregistrado no stream do Kinesis antes de ocorrer um erro. O padrão é '300 segundos'. |
consumerRefreshInterval |
STRING |
O intervalo em que o consumidor é verificado e atualizado. O padrão é '300 segundos'. |
Os argumentos a seguir são usados para controlar a taxa de transferência de leitura e a latência do Kinesis:
Parâmetro | Tipo | Descrição |
---|---|---|
maxRecordsPerFetch |
INTEGER (>0) |
Opcional, com um padrão de 10.000 registros a serem lidos por solicitação de API para o Kinesis. |
maxFetchRate |
STRING |
A velocidade de pré-carregamento de dados por partição. Um valor entre '1,0' e '2,0' medido em MB/s. O padrão é '1.0'. |
minFetchPeriod |
STRING |
O tempo máximo de espera entre tentativas consecutivas de pré-busca. O padrão é '400ms'. |
maxFetchDuration |
STRING |
A duração máxima para armazenar em buffer novos dados pré-obtidos. O valor predefinido é '10s'. |
fetchBufferSize |
STRING |
A quantidade de dados para o próximo gatilho. O padrão é '20gb'. |
shardsPerTask |
INTEGER (>0) |
O número de fragmentos Kinesis a serem pré-buscados em paralelo por tarefa de faísca. A predefinição é 5. |
shardFetchinterval |
STRING |
Com que frequência verificar para redistribuição de fragmentos. O padrão é '1 segundo'. |
coalesceThresholdBlockSize |
INTEGER (>0) |
O limiar a partir do qual ocorre a coalescência automática. O padrão é 10.000.000. |
coalesce |
BOOLEAN |
true para aglutinar pedidos pré-buscados. A predefinição é true . |
coalesceBinSize |
INTEGER (>0) |
O tamanho aproximado do bloco após a coalescência. O padrão é 128.000.000. |
reuseKinesisClient |
BOOLEAN |
true para reutilizar o cliente Kinesis armazenado no cache. O padrão é true exceto em um cluster PE. |
clientRetries |
INTEGER (>0) |
O número de tentativas no cenário de novas tentativas. A predefinição é 5. |
Devoluções
Uma tabela de registros do Kinesis com o seguinte esquema:
Name | Tipo de dados | Pode ser nulo | Padrão | Descrição |
---|---|---|---|---|
partitionKey |
STRING |
Não | Uma chave que é usada para distribuir dados entre os fragmentos de um fluxo. Todos os registos de dados com a mesma chave de partição serão lidos a partir do mesmo fragmento. | |
data |
BINARY |
Não | A carga útil de dados Kinesis, codificada em base-64. | |
stream |
STRING |
Não | O nome do fluxo de onde os dados foram lidos. | |
shardId |
STRING |
Não | Um identificador exclusivo para o fragmento de onde os dados foram lidos. | |
sequenceNumber |
BIGINT |
Não | O identificador exclusivo do registro em seu fragmento. | |
approximateArrivalTimestamp |
TIMESTAMP |
Não | A hora aproximada em que o registro foi inserido no fluxo. |
As colunas (stream, shardId, sequenceNumber)
constituem uma chave primária.
Exemplos
-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
awsAccessKey => secret(‘test-databricks’, ‘awsAccessKey’),
awsSecretKey => secret(‘test-databricks’, ‘awsSecretKey’),
initialPosition => 'earliest');
-- The data would now need to be queried from the testing.streaming_table
-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest');
-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest',
roleArn => 'arn:aws:iam::123456789012:role/MyRole',
roleSessionName => 'testing@databricks.com');