Compartilhar via


função de streaming com valor de tabela read_kinesis

Aplica-se a:verificação marcada como sim SQL do Databricks verificação marcada como sim Databricks Runtime 13.3 LTS e versões posteriores

Retorna uma tabela com registros lidos do Kinesis de um ou mais fluxos.

Sintaxe

read_kinesis ( { parameter => value } [, ...] )

Argumentos

read_kinesis requer invocação de parâmetro nomeado.

O único argumento necessário é streamName. Todos os outros argumentos são opcionais.

As descrições dos argumentos são breves aqui. Para obter mais detalhes, confira a documentação do Amazon Kinesis.

Há várias opções de conexão para se conectar e autenticar com a AWS. awsAccessKey e awsSecretKey podem ser especificados nos argumentos de função usando a função secret, definidos manualmente nos argumentos ou configurados como variáveis de ambiente, conforme indicado abaixo. roleArn, roleExternalID e roleSessionName também podem ser usados para autenticar com a AWS usando perfis de instância. Se nenhum deles for especificado, será usada a cadeia de provedores padrão da AWS.

Parâmetro Tipo Descrição
streamName STRING Obrigatório, lista separada por vírgulas de um ou mais fluxos do Kinesis.
awsAccessKey STRING A chave de acesso da AWS, se houver. Também pode ser especificada por meio das várias opções compatíveis com a cadeia de provedores de credenciais padrão da AWS, incluindo variáveis de ambiente (AWS_ACCESS_KEY_ID) e um arquivo de perfis de credenciais.
awsSecretKey STRING A chave secreta que corresponde à chave de acesso. Também pode ser especificada por meio das várias opções compatíveis com a cadeia de provedores de credenciais padrão da AWS, incluindo variáveis de ambiente (AWS_SECRET_KEY ou AWS_SECRET_ACCESS_KEY) e um arquivo de perfis de credenciais.
roleArn STRING O nome de recurso da Amazon da função a ser assumida ao acessar o Kinesis.
roleExternalId STRING Usado ao delegar acesso à conta da AWS.
roleSessionName STRING Nome da sessão de função da AWS.
stsEndpoint STRING Um ponto de extremidade para solicitar credenciais de acesso temporário.
region STRING Região dos fluxos a serem especificados. O padrão é a região resolvida localmente.
endpoint STRING ponto de extremidade regional para fluxos de dados do Kinesis. O padrão é a região resolvida localmente.
initialPosition STRING Posição inicial para leitura no fluxo. Uma das opções: "latest" (padrão), "trim_horizon", "earliest", "at_timestamp".
consumerMode STRING Uma das opções: "polling" (padrão) ou "EFO" (enhanced-fan-out).
consumerName STRING O nome do consumidor. Todos os consumidores são prefixados com "databricks_". O padrão é uma cadeia de caracteres vazia.
registerConsumerTimeoutInterval STRING o tempo limite máximo para aguardar que o consumidor de EFO do Kinesis seja registrado no fluxo do Kinesis antes de gerar um erro. O padrão é ''300s''.
requireConsumerDeregistration BOOLEAN true para cancelar o registro do consumidor EFO no término da consulta. O padrão é false.
deregisterConsumerTimeoutInterval STRING O tempo limite máximo a ser esperado para que o consumidor de EFO do Kinesis tenha o registro cancelado com o fluxo do Kinesis antes de gerar um erro. O padrão é ''300s''.
consumerRefreshInterval STRING O intervalo no qual o consumidor é verificado e atualizado. O padrão é ''300s''.

Os seguintes argumentos são usados para controlar a taxa de transferência de leitura e a latência do Kinesis:

Parâmetro Tipo Descrição
maxRecordsPerFetch INTEGER (>0) Opcional, com um padrão de 10.000, registros a serem lidos por solicitação de API para o Kinesis.
maxFetchRate STRING A velocidade da pré-busca de dados por fragmento. Um valor entre ''1.0'' e ''2.0'' medido em MB/s. O padrão é "1.0".
minFetchPeriod STRING O tempo de espera máximo entre tentativas de pré-busca consecutivas. O padrão é ''400ms''.
maxFetchDuration STRING A duração máxima para armazenar os novos dados pré-buscados. O padrão é ’’10s’’.
fetchBufferSize STRING A quantidade de dados para o próximo gatilho. O padrão é '’20gb’'.
shardsPerTask INTEGER (>0) O número de fragmentos do Kinesis a serem pré-buscados em paralelo por tarefa do Spark. O padrão é 5.
shardFetchinterval STRING Com que frequência pesquisar para recriar fragmentos. O padrão é ''1s''.
coalesceThresholdBlockSize INTEGER (>0) O limite no qual a união automática ocorre. O padrão é 10.000.000.
coalesce BOOLEAN true para unir solicitações pré-buscadas. O padrão é true.
coalesceBinSize INTEGER (>0) O tamanho do bloco aproximado após a união. O padrão é 128.000.000.
reuseKinesisClient BOOLEAN true para reutilizar o cliente Kinesis armazenado no cache. O padrão é true exceto em um cluster PE.
clientRetries INTEGER (>0) O número de repetições no cenário de repetição. O padrão é 5.

Retornos

Uma tabela de registros Kinesis com o seguinte esquema:

Nome Tipo de dados Nullable Standard Descrição
partitionKey STRING Não Uma chave usada para distribuir dados entre os fragmentos de um fluxo. Todos os registros de dados com a mesma chave de partição serão lidos do mesmo fragmento.
data BINARY Não O conteúdo de dados do Kinesis, codificado em base 64.
stream STRING Não O nome do fluxo do qual os dados foram lidos.
shardId STRING Não Um identificador exclusivo para o fragmento do qual os dados foram lidos.
sequenceNumber BIGINT Não O identificador exclusivo do registro dentro de seu fragmento.
approximateArrivalTimestamp TIMESTAMP Não A hora aproximada em que o registro foi inserido no fluxo.

As colunas (stream, shardId, sequenceNumber) constituem uma chave primária.

Exemplos

-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        awsAccessKey => secret(‘test-databricks’, ‘awsAccessKey’),
        awsSecretKey => secret(‘test-databricks’, ‘awsSecretKey’),
        initialPosition => 'earliest');

-- The data would now need to be queried from the testing.streaming_table

-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        initialPosition => 'earliest');

-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        initialPosition => 'earliest',
        roleArn => 'arn:aws:iam::123456789012:role/MyRole',
        roleSessionName => 'testing@databricks.com');