Compartilhar via


Desenvolver código de pipeline com SQL

O Delta Live Tables apresenta várias novas palavras-chave e funções SQL para definir exibições materializadas e tabelas de streaming em pipelines. O suporte SQL para o desenvolvimento de pipelines baseia-se nos conceitos básicos do Spark SQL e adiciona suporte para a funcionalidade de Streaming Estruturado.

Os usuários familiarizados com o PySpark DataFrames podem preferir desenvolver código de pipeline com Python. O Python oferece suporte a testes e operações mais extensos que são difíceis de implementar com SQL, como operações de metaprogramação. Consulte Desenvolver código de pipeline com Python.

Para obter uma referência completa da sintaxe SQL do Delta Live Tables, consulte Referência da linguagem SQL do Delta Live Tables.

Noções básicas de SQL para desenvolvimento de pipeline

O código SQL que cria conjuntos de dados Delta Live Tables usa a sintaxe para definir exibições materializadas e tabelas de streaming em relação aos resultados da CREATE OR REFRESH consulta.

A STREAM palavra-chave indica se a fonte de dados referenciada em uma SELECT cláusula deve ser lida com semântica de streaming.

O código-fonte do Delta Live Tables difere criticamente dos scripts SQL: o Delta Live Tables avalia todas as definições de conjunto de dados em todos os arquivos de código-fonte configurados em um pipeline e cria um gráfico de fluxo de dados antes que qualquer consulta seja executada. A ordem das consultas que aparecem em um notebook ou script não define a ordem de execução.

Criar uma exibição materializada com SQL

O exemplo de código a seguir demonstra a sintaxe básica para criar uma exibição materializada com SQL:

CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;

Criar uma tabela de streaming com SQL

O exemplo de código a seguir demonstra a sintaxe básica para criar uma tabela de streaming com SQL:

Observação

Nem todas as fontes de dados dão suporte a leituras de streaming e algumas fontes de dados sempre devem ser processadas com semântica de streaming.

CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;

Carregar dados do armazenamento de objetos

O Delta Live Tables dá suporte ao carregamento de dados de todos os formatos compatíveis com o Azure Databricks. Confira Opções de formato de arquivo.

Observação

Esses exemplos usam dados disponíveis no /databricks-datasets montado automaticamente em seu espaço de trabalho. O Databricks recomenda o uso de caminhos de volume ou URIs de nuvem para fazer referência a dados armazenados no armazenamento de objetos de nuvem. Confira O que são os volumes do Catálogo do Unity?.

O Databricks recomenda usar o Carregador Automático e as tabelas de streaming ao configurar cargas de trabalho de ingestão incremental em relação aos dados armazenados no armazenamento de objetos de nuvem. Confira O que é o Carregador Automático?.

O SQL usa a função para invocar a read_files funcionalidade do Carregador Automático. Você também deve usar a STREAM palavra-chave para configurar uma leitura de streaming com read_files.

O exemplo a seguir cria uma tabela de streaming de arquivos JSON usando o Carregador Automático:

CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

A read_files função também dá suporte à semântica em lote para criar exibições materializadas. O exemplo a seguir usa a semântica em lote para ler um diretório JSON e criar uma exibição materializada:

CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");

Valide dados com expectativas

Você pode usar expectativas para definir e impor restrições de qualidade de dados. Confira Gerenciar a qualidade dos dados com o Delta Live Tables.

O código a seguir define uma expectativa chamada valid_data que descarta registros nulos durante a ingestão de dados:

CREATE OR REFRESH STREAMING TABLE orders_valid(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

Consultar exibições materializadas e tabelas de streaming definidas em seu pipeline

Use o LIVE esquema para consultar outras exibições materializadas e tabelas de streaming definidas em seu pipeline.

O exemplo a seguir define quatro conjuntos de dados:

  • Uma tabela de streaming chamada orders que carrega dados JSON.
  • Uma exibição materializada chamada customers que carrega dados CSV.
  • Uma exibição materializada nomeada customer_orders que une orders registros dos conjuntos de dados e customers converte o carimbo de data/hora do pedido em uma data e seleciona os customer_idcampos , order_number, state, e order_date .
  • Uma exibição materializada chamada daily_orders_by_state que agrega a contagem diária de pedidos para cada estado.
CREATE OR REFRESH STREAMING TABLE orders(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");

CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
  c.customer_id,
  o.order_number,
  c.state,
  date(timestamp(int(o.order_datetime))) order_date
FROM LIVE.orders o
INNER JOIN LIVE.customers c
ON o.customer_id = c.customer_id;

CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM LIVE.customer_orders
GROUP BY state, order_date;