função de streaming com valor de tabela read_kinesis
Aplica-se a: SQL do Databricks
Databricks Runtime 13.3 LTS e versões posteriores
Retorna uma tabela com registros lidos do Kinesis de um ou mais fluxos.
Sintaxe
read_kinesis ( { parameter => value } [, ...] )
Argumentos
read_kinesis
requer invocação de parâmetro nomeado.
O único argumento necessário é streamName
. Todos os outros argumentos são opcionais.
As descrições dos argumentos são breves aqui. Para obter mais detalhes, confira a documentação do Amazon Kinesis.
Há várias opções de conexão para se conectar e autenticar com a AWS.
awsAccessKey
e awsSecretKey
podem ser especificados nos argumentos de função usando a função secret, definidos manualmente nos argumentos ou configurados como variáveis de ambiente, conforme indicado abaixo.
roleArn
, roleExternalID
e roleSessionName
também podem ser usados para autenticar com a AWS usando perfis de instância.
Se nenhum deles for especificado, será usada a cadeia de provedores padrão da AWS.
Parâmetro | Tipo | Descrição |
---|---|---|
streamName |
STRING |
Obrigatório, lista separada por vírgulas de um ou mais fluxos do Kinesis. |
awsAccessKey |
STRING |
A chave de acesso da AWS, se houver. Também pode ser especificada por meio das várias opções compatíveis com a cadeia de provedores de credenciais padrão da AWS, incluindo variáveis de ambiente (AWS_ACCESS_KEY_ID ) e um arquivo de perfis de credenciais. |
awsSecretKey |
STRING |
A chave secreta que corresponde à chave de acesso. Também pode ser especificada por meio das várias opções compatíveis com a cadeia de provedores de credenciais padrão da AWS, incluindo variáveis de ambiente (AWS_SECRET_KEY ou AWS_SECRET_ACCESS_KEY ) e um arquivo de perfis de credenciais. |
roleArn |
STRING |
O nome de recurso da Amazon da função a ser assumida ao acessar o Kinesis. |
roleExternalId |
STRING |
Usado ao delegar acesso à conta da AWS. |
roleSessionName |
STRING |
Nome da sessão de função da AWS. |
stsEndpoint |
STRING |
Um ponto de extremidade para solicitar credenciais de acesso temporário. |
region |
STRING |
Região dos fluxos a serem especificados. O padrão é a região resolvida localmente. |
endpoint |
STRING |
ponto de extremidade regional para fluxos de dados do Kinesis. O padrão é a região resolvida localmente. |
initialPosition |
STRING |
Posição inicial para leitura no fluxo. Uma das opções: "latest" (padrão), "trim_horizon", "earliest", "at_timestamp". |
consumerMode |
STRING |
Uma das opções: "polling" (padrão) ou "EFO" (enhanced-fan-out). |
consumerName |
STRING |
O nome do consumidor. Todos os consumidores são prefixados com "databricks_". O padrão é uma cadeia de caracteres vazia. |
registerConsumerTimeoutInterval |
STRING |
o tempo limite máximo para aguardar que o consumidor de EFO do Kinesis seja registrado no fluxo do Kinesis antes de gerar um erro. O padrão é ''300s''. |
requireConsumerDeregistration |
BOOLEAN |
true para cancelar o registro do consumidor EFO no término da consulta. O padrão é false . |
deregisterConsumerTimeoutInterval |
STRING |
O tempo limite máximo a ser esperado para que o consumidor de EFO do Kinesis tenha o registro cancelado com o fluxo do Kinesis antes de gerar um erro. O padrão é ''300s''. |
consumerRefreshInterval |
STRING |
O intervalo no qual o consumidor é verificado e atualizado. O padrão é ''300s''. |
Os seguintes argumentos são usados para controlar a taxa de transferência de leitura e a latência do Kinesis:
Parâmetro | Tipo | Descrição |
---|---|---|
maxRecordsPerFetch |
INTEGER (>0) |
Opcional, com um padrão de 10.000, registros a serem lidos por solicitação de API para o Kinesis. |
maxFetchRate |
STRING |
A velocidade da pré-busca de dados por fragmento. Um valor entre ''1.0'' e ''2.0'' medido em MB/s. O padrão é "1.0". |
minFetchPeriod |
STRING |
O tempo de espera máximo entre tentativas de pré-busca consecutivas. O padrão é ''400ms''. |
maxFetchDuration |
STRING |
A duração máxima para armazenar os novos dados pré-buscados. O padrão é ’’10s’’. |
fetchBufferSize |
STRING |
A quantidade de dados para o próximo gatilho. O padrão é '’20gb’'. |
shardsPerTask |
INTEGER (>0) |
O número de fragmentos do Kinesis a serem pré-buscados em paralelo por tarefa do Spark. O padrão é 5. |
shardFetchinterval |
STRING |
Com que frequência pesquisar para recriar fragmentos. O padrão é ''1s''. |
coalesceThresholdBlockSize |
INTEGER (>0) |
O limite no qual a união automática ocorre. O padrão é 10.000.000. |
coalesce |
BOOLEAN |
true para unir solicitações pré-buscadas. O padrão é true . |
coalesceBinSize |
INTEGER (>0) |
O tamanho do bloco aproximado após a união. O padrão é 128.000.000. |
reuseKinesisClient |
BOOLEAN |
true para reutilizar o cliente Kinesis armazenado no cache. O padrão é true exceto em um cluster PE. |
clientRetries |
INTEGER (>0) |
O número de repetições no cenário de repetição. O padrão é 5. |
Retornos
Uma tabela de registros Kinesis com o seguinte esquema:
Nome | Tipo de dados | Nullable | Standard | Descrição |
---|---|---|---|---|
partitionKey |
STRING |
Não | Uma chave usada para distribuir dados entre os fragmentos de um fluxo. Todos os registros de dados com a mesma chave de partição serão lidos do mesmo fragmento. | |
data |
BINARY |
Não | O conteúdo de dados do Kinesis, codificado em base 64. | |
stream |
STRING |
Não | O nome do fluxo do qual os dados foram lidos. | |
shardId |
STRING |
Não | Um identificador exclusivo para o fragmento do qual os dados foram lidos. | |
sequenceNumber |
BIGINT |
Não | O identificador exclusivo do registro dentro de seu fragmento. | |
approximateArrivalTimestamp |
TIMESTAMP |
Não | A hora aproximada em que o registro foi inserido no fluxo. |
As colunas (stream, shardId, sequenceNumber)
constituem uma chave primária.
Exemplos
-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
awsAccessKey => secret(‘test-databricks’, ‘awsAccessKey’),
awsSecretKey => secret(‘test-databricks’, ‘awsSecretKey’),
initialPosition => 'earliest');
-- The data would now need to be queried from the testing.streaming_table
-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest');
-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest',
roleArn => 'arn:aws:iam::123456789012:role/MyRole',
roleSessionName => 'testing@databricks.com');