Compartilhar via


Consultar dados de streaming

Você pode usar o Azure Databricks para consultar fontes de dados de streaming usando o Streaming Estruturado. O Azure Databricks fornece amplo suporte para cargas de trabalho de streaming em Python e Scala e dá suporte à maioria das funcionalidades de Streaming Estruturado com SQL.

Os exemplos a seguir demonstram o uso de um coletor de memória para inspeção manual de dados de streaming durante o desenvolvimento interativo em notebooks. Devido aos limites de saída de linha na interface do usuário do notebook, talvez você não observe todos os dados lidos por consultas de streaming. Nas cargas de trabalho de produção, você só deve disparar consultas de streaming gravando-as em uma tabela de destino ou sistema externo.

Observação

O suporte do SQL para consultas interativas em dados de streaming é limitado a notebooks em execução na computação de todas as finalidades. Você também pode usar o SQL ao declarar tabelas de streaming no Databricks SQL ou nas Tabelas Dinâmicas Delta. Confira Carregar dados usando tabelas de streaming no Databricks SQL e O que são Tabelas Dinâmicas Delta?.

Consultar dados de sistemas de streaming

O Azure Databricks fornece leitores de dados de streaming para os seguintes sistemas de streaming:

  • Kafka
  • Kinesis
  • PubSub
  • Pulsar

Você precisa fornecer detalhes de configuração ao inicializar consultas em relação a esses sistemas, que variam dependendo do ambiente configurado e do sistema do qual você escolhe ler. Consulte Configurar fontes de dados de streaming.

Cargas de trabalho comuns que envolvem sistemas de streaming incluem ingestão de dados para o lakehouse e processamento de fluxo para coletar dados para sistemas externos. Para obter mais informações sobre cargas de trabalho de streaming, confira Streaming no Azure Databricks.

Os exemplos a seguir demonstram uma leitura interativa de streaming do Kafka:

Python

display(spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

SQL

SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>',
  startingOffsets => 'latest'
);

Consultar uma tabela como uma leitura de streaming

O Azure Databricks cria todas as tabelas usando Delta Lake por padrão. Quando você executa uma consulta de streaming em uma tabela Delta, a consulta automaticamente seleciona novos registros quando uma versão da tabela é confirmada. Por padrão, as consultas de streaming esperam que as tabelas de origem contenham apenas registros acrescentados. Se você precisar trabalhar com dados de streaming que contenham atualizações e exclusões, o Databricks recomenda usar Tabelas Dinâmicas Delta e APPLY CHANGES INTO. Confira a API APPLY CHANGES: Simplifique a captura de dados de alteração nas tabelas Delta Live.

Os exemplos a seguir demonstram a execução de uma leitura interativa de streaming de uma tabela:

Python

display(spark.readStream.table("table_name"))

SQL

SELECT * FROM STREAM table_name

Consultar dados no armazenamento de objetos de nuvem com o Carregador Automático

Você pode transmitir dados do armazenamento de objetos de nuvem usando o Carregador Automático, o conector de dados de nuvem do Azure Databricks. Você pode usar o conector com arquivos armazenados em volumes do Catálogo do Unity ou em outros locais de armazenamento de objetos de nuvem. O Databricks recomenda o uso de volumes para gerenciar o acesso aos dados no armazenamento de objetos na nuvem. Confira Conectar-se a fontes de dados.

O Azure Databricks otimiza esse conector para ingestão de streaming de dados no armazenamento de objetos de nuvem armazenados em formatos estruturados, semiestruturados e não estruturados populares. O Databricks recomenda armazenar dados ingeridos em um formato quase bruto para maximizar a taxa de transferência e minimizar possíveis perdas de dados devido a registros corrompidos ou alterações de esquema.

Para obter mais recomendações sobre como ingerir dados do armazenamento de objetos de nuvem, confira Ingerir dados em um lakehouse do Databricks.

Os exemplos a seguir demonstram uma leitura interativa de streaming de um diretório de arquivos JSON em um volume:

Python

display(spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").load("/Volumes/catalog/schema/volumes/path/to/files"))

SQL

SELECT * FROM STREAM read_files('/Volumes/catalog/schema/volumes/path/to/files', format => 'json')