Partilhar via


Desenvolver código de pipeline com SQL

A DLT introduz várias novas palavras-chave e funções SQL para definir visualizações materializadas e tabelas de streaming em pipelines. O suporte SQL para o desenvolvimento de pipelines baseia-se nos conceitos básicos do Spark SQL e adiciona suporte para a funcionalidade de Streaming Estruturado.

Os usuários familiarizados com o PySpark DataFrames podem preferir desenvolver código de pipeline com Python. O Python suporta testes e operações mais abrangentes que são difíceis de implementar com SQL, como operações de metaprogramação. Consulte Desenvolver código de pipeline com Python.

Para obter uma referência completa da sintaxe DLT SQL, consulte referência da linguagem DLT SQL.

Noções básicas de SQL para desenvolvimento de pipeline

O código SQL que cria conjuntos de dados DLT usa a sintaxe CREATE OR REFRESH para definir exibições materializadas e tabelas de streaming em relação aos resultados da consulta.

A palavra-chave STREAM indica se a fonte de dados referenciada em uma cláusula SELECT deve ser lida com semântica de streaming.

As operações de leitura e escrita usam por padrão o catálogo e o esquema especificados durante a configuração do pipeline. Consulte Defina o catálogo de destino e o esquema.

O código-fonte DLT difere criticamente dos scripts SQL: O DLT avalia todas as definições de conjunto de dados em todos os arquivos de código-fonte configurados em um pipeline e cria um gráfico de fluxo de dados antes que quaisquer consultas sejam executadas. A ordem das consultas que aparecem em um bloco de anotações ou script define a ordem de avaliação do código, mas não a ordem de execução da consulta.

Criar uma vista materializada com SQL

O exemplo de código a seguir demonstra a sintaxe básica para criar uma exibição materializada com SQL:

CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;

Criar uma tabela de streaming com SQL

O exemplo de código a seguir demonstra a sintaxe básica para criar uma tabela de streaming com SQL:

Observação

Nem todas as fontes de dados suportam leituras de streaming, e algumas fontes de dados devem sempre ser processadas com semântica de streaming.

CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;

Carregar dados do armazenamento de objetos

A DLT suporta o carregamento de dados de todos os formatos suportados pelo Azure Databricks. Consulte Opções de formato de dados.

Observação

Estes exemplos usam dados disponíveis no diretório /databricks-datasets, montados automaticamente no seu espaço de trabalho. O Databricks recomenda o uso de caminhos de volume ou URIs de nuvem para fazer referência a dados armazenados no armazenamento de objetos em nuvem. Consulte O que são os volumes do Unity Catalog?.

O Databricks recomenda o uso do Auto Loader e de tabelas de streaming ao configurar cargas de trabalho de ingestão incremental em relação aos dados armazenados no armazenamento de objetos na nuvem. Veja O que é o Auto Loader?.

SQL usa a função read_files para invocar a funcionalidade Auto Loader. Você também deve usar a palavra-chave STREAM para configurar uma leitura contínua com read_files.

O exemplo a seguir cria uma tabela de streaming a partir de arquivos JSON usando o Auto Loader:

CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

A função read_files também suporta semântica em lote para criar visualizações materializadas. O exemplo a seguir usa semântica em lote para ler um diretório JSON e criar uma exibição materializada:

CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");

Valide dados com expectativas

Você pode usar as expectativas para definir e impor restrições de qualidade de dados. Consulte Gerir a qualidade dos dados com as expectativas do fluxo de trabalho.

O código a seguir define uma expectativa chamada valid_data que descarta registros que são nulos durante a ingestão de dados:

CREATE OR REFRESH STREAMING TABLE orders_valid(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

Consultar visões materializadas e tabelas de streaming definidas no seu pipeline

O exemplo a seguir define quatro conjuntos de dados:

  • Uma tabela de streaming chamada orders que carrega dados JSON.
  • Uma exibição materializada chamada customers que carrega dados CSV.
  • Uma exibição materializada chamada customer_orders que une registros dos conjuntos de dados orders e customers, converte o carimbo de data/hora do pedido para uma data e seleciona os campos customer_id, order_number, statee order_date.
  • Uma visão materializada denominada daily_orders_by_state que agrega a contagem diária de pedidos para cada estado.

Observação

Ao consultar exibições ou tabelas em seu pipeline, você pode especificar o catálogo e o esquema diretamente ou pode usar os padrões configurados em seu pipeline. Neste exemplo, as tabelas orders, customerse customer_orders são escritas e lidas do catálogo e esquema padrão configurados para seu pipeline.

O modo de publicação herdado usa o esquema LIVE para consultar outras exibições materializadas e tabelas de streaming definidas em seu pipeline. Em novos pipelines, a sintaxe do esquema LIVE é silenciosamente ignorada. Veja o esquema LIVE (antigo) .

CREATE OR REFRESH STREAMING TABLE orders(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");

CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
  c.customer_id,
  o.order_number,
  c.state,
  date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;

CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;