Compartilhar via


Referência da linguagem DLT SQL

Este artigo tem detalhes para a interface de programação DLT SQL.

  • Para obter informações sobre a API do Python, consulte a referência da linguagem Python DLT.
  • Para obter mais informações sobre comandos SQL, consulte referência de linguagem SQL.

Você pode usar funções definidas pelo usuário (UDFs) do Python em suas consultas SQL, mas deve definir esses UDFs em arquivos Python antes de chamá-los em arquivos de origem SQL. Consulte Funções escalares definidas pelo usuário - Python.

Limitações

A cláusula PIVOT não é suportada. A operação pivot no Spark requer o carregamento ansioso de dados de entrada para calcular o esquema de saída. Esta capacidade não é suportada na DLT.

Criar uma vista materializada DLT ou uma tabela de streaming

Observação

A sintaxe CREATE OR REFRESH LIVE TABLE para criar uma exibição materializada foi preterida. Em vez disso, use CREATE OR REFRESH MATERIALIZED VIEW.

Você usa a mesma sintaxe SQL básica ao declarar uma tabela de streaming ou uma exibição materializada.

Declarar uma exibição materializada de DLT com SQL

A seguir descrevemos a sintaxe para declarar uma exibição materializada em DLT com SQL:

CREATE OR REFRESH MATERIALIZED VIEW view_name [CLUSTER BY (col_name1, col_name2, ... )]
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  CLUSTER BY clause
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  [ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
  AS select_statement

Declarar uma tabela de streaming DLT com SQL

Você só pode declarar tabelas de streaming usando consultas que são lidas em relação a uma fonte de streaming. A Databricks recomenda o uso do Auto Loader para transmitir a ingestão de arquivos do armazenamento de objetos na nuvem. Consulte a sintaxe SQL do Auto Loader em .

Ao especificar outras tabelas ou exibições em seu pipeline como fontes de streaming, você deve incluir a função STREAM() em torno de um nome de conjunto de dados.

A seguir descreve-se a sintaxe para declarar uma tabela de Streaming em DLT com SQL:

CREATE OR REFRESH [TEMPORARY] STREAMING TABLE table_name [CLUSTER BY (col_name1, col_name2, ... )]
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  [CLUSTER BY clause]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  [ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
  AS select_statement

Criar uma vista DLT

A seguir descrevemos a sintaxe para declarar modos de exibição com SQL:

CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
  [(
    [
    col_name1 [ COMMENT col_comment1 ],
    col_name2 [ COMMENT col_comment2 ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
  )]
  [COMMENT view_comment]
  AS select_statement

Sintaxe SQL do Auto Loader

A seguir descrevemos a sintaxe para trabalhar com o Auto Loader em SQL:

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
  FROM read_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value>",
      "<option-key>", "<option_value>",
      ...
    )
  )

Você pode usar as opções de formato suportadas com o Auto Loader. Usando a função map(), você pode passar opções para o método read_files(). Opções são pares chave-valor, onde as chaves e os valores são cadeias de caracteres. Para obter detalhes sobre formatos e opções de suporte, consulte Opções de formato de arquivo.

Exemplo: Definir tabelas

Você pode criar um conjunto de dados lendo a partir de uma fonte de dados externa ou de conjuntos de dados definidos em um pipeline. Para ler a partir de um conjunto de dados interno, especifique o nome da tabela que usará os padrões de pipeline configurados para catálogo e esquema. O exemplo a seguir define dois conjuntos de dados diferentes: uma tabela chamada taxi_raw que usa um arquivo JSON como fonte de entrada e uma tabela chamada filtered_data que usa a tabela taxi_raw como entrada:

CREATE OR REFRESH MATERIALIZED VIEW taxi_raw
AS SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`

CREATE OR REFRESH MATERIALIZED VIEW filtered_data
AS SELECT
  ...
FROM taxi_raw

Exemplo: Ler a partir de uma fonte de streaming

Para ler dados de uma fonte de streaming, por exemplo, Auto Loader ou um conjunto de dados interno, defina uma tabela STREAMING:

CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)

Para obter mais informações sobre a transmissão de dados, consulte Transforme dados com pipelines.

Exclusão permanente de registos de uma visualização materializada ou tabela de streaming

Para excluir permanentemente registros de uma exibição materializada ou tabela de streaming com vetores de exclusão habilitados, como para conformidade com o GDPR, operações adicionais devem ser executadas nas tabelas Delta subjacentes do objeto. Para garantir a exclusão de registros de uma exibição materializada, consulte Excluir permanentemente registros de uma exibição materializada com vetores de exclusão habilitados. Para garantir a exclusão de registros de uma tabela de streaming, consulte Excluir permanentemente registros de uma tabela de streaming.

Controlar como as tabelas são materializadas

As tabelas também oferecem um controlo adicional da sua materialização:

  • Especifique como tabelas de de cluster usando CLUSTER BY. Você pode usar agrupamento líquido para acelerar consultas. Consulte Usar agrupamento líquido para tabelas Delta.
  • Especifique como as tabelas são particionadas usando PARTITIONED BY.
  • Você pode definir as propriedades da tabela usando TBLPROPERTIES. Consulte as propriedades da tabela DLT .
  • Defina um local de armazenamento usando a configuração LOCATION. Por padrão, os dados da tabela são armazenados no local de armazenamento do pipeline se LOCATION não estiver definido.
  • Você pode usar colunas geradas na sua definição de esquema. Veja Exemplo: Especificar um esquema e colunas de agrupamento.

Observação

Para tabelas com menos de 1 TB de tamanho, o Databricks recomenda permitir que a DLT controle a organização dos dados. A menos que você espere que sua tabela cresça além de um terabyte, o Databricks recomenda que você não especifique colunas de partição.

Exemplo: Especificar um esquema e colunas do cluster

Opcionalmente, você pode especificar um esquema ao definir uma tabela. O exemplo a seguir especifica o esquema para a tabela de destino, incluindo o uso do Delta Lake colunas geradas e define colunas de clustering para a tabela:

CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) CLUSTER BY (order_day_of_week, customer_id)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

Por padrão, a DLT infere o esquema da definição de table se você não especificar um esquema.

Exemplo: Especificar colunas de partição

Opcionalmente, você pode especificar colunas de partição para a tabela:

CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

O agrupamento líquido oferece uma solução flexível e otimizada para agrupamento. Considere usar CLUSTER BY em vez de PARTITIONED BY para DLT.

Exemplo: Definir restrições de tabela

Observação

O suporte de DLT para as restrições de tabela está em visualização pública . Para definir restrições de tabela, seu pipeline deve ser um pipeline habilitado para Unity Catalog e configurado para usar o canal preview.

Ao especificar um esquema, você pode definir chaves primárias e estrangeiras. As restrições são informativas e não são aplicadas. Consulte a cláusula CONSTRAINT na referência da linguagem SQL.

O exemplo a seguir define uma tabela com uma restrição de chave primária e estrangeira:

CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING NOT NULL PRIMARY KEY,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
  CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

Parametrizar valores usados ao declarar tabelas ou exibições com SQL

Use SET para especificar um valor de configuração em uma consulta que declare uma tabela ou exibição, incluindo configurações do Spark. Qualquer tabela ou exibição definida em um bloco de anotações após a instrução SET tem acesso ao valor definido. Todas as configurações do Spark especificadas usando a instrução SET são usadas ao executar a consulta Spark para qualquer tabela ou exibição após a instrução SET. Para ler um valor de configuração numa consulta, utilize a sintaxe de interpolação de string ${}. O exemplo a seguir define um valor de configuração do Spark chamado startDate e usa esse valor em uma consulta:

SET startDate='2020-01-01';

CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}

Para especificar vários valores de configuração, use uma instrução SET separada para cada valor.

Exemplo: Definir um filtro de linha e uma máscara de coluna

Importante

Os filtros de linha e as máscaras de coluna estão em Pré-visualização Pública .

Para criar uma exibição materializada ou uma tabela de streaming com um filtro de linha e uma máscara de coluna, use a cláusula ROW FILTER e a cláusula MASK . O exemplo a seguir demonstra como definir um modo de exibição materializado e uma tabela Streaming com um filtro de linha e uma máscara de coluna:

CREATE OR REFRESH STREAMING TABLE customers_silver (
  id int COMMENT 'This is the customer ID',
  name string,
  region string,
  ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(customers_bronze)

CREATE OR REFRESH MATERIALIZED VIEW sales (
  customer_id STRING MASK catalog.schema.customer_id_mask_fn,
  customer_name STRING,
  number_of_line_items STRING COMMENT 'Number of items in the order',
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
)
COMMENT "Raw data on sales"
WITH ROW FILTER catalog.schema.order_number_filter_fn ON (order_number)
AS SELECT * FROM sales_bronze

Para obter mais informações sobre filtros de linha e máscaras de coluna, consulte Publicar tabelas com filtros de linha e máscaras de coluna.

Propriedades SQL

CREATE TABLE ou VER
TEMPORARY

Crie uma tabela, mas não publique metadados para a tabela. A cláusula TEMPORARY instrui a DLT a criar uma tabela que está disponível para o pipeline, mas não deve ser acessada fora do pipeline. Para reduzir o tempo de processamento, uma tabela temporária persiste durante o tempo de vida do pipeline que a cria, e não apenas uma única atualização.
STREAMING

Crie uma tabela que leia um conjunto de dados de entrada como um fluxo. O conjunto de dados de entrada deve ser uma fonte de dados de streaming, por exemplo, Auto Loader ou uma tabela STREAMING.
CLUSTER BY

Habilite o clustering líquido na tabela e defina as colunas a serem usadas como chaves de clustering.

Consulte Usar agrupamento líquido para tabelas Delta.
PARTITIONED BY

Uma lista opcional de uma ou mais colunas a serem usadas para particionar a tabela.
LOCATION

Um local de armazenamento opcional para dados de tabela. Se não estiver definido, o sistema assumirá como padrão o local de armazenamento do pipeline.
COMMENT

Uma descrição opcional para a tabela.
column_constraint

Uma chave primária informativa opcional ou uma restrição de chave estrangeira na coluna.
MASK clause (Visualização pública)

Adiciona uma função de máscara de coluna para anonimizar dados confidenciais. Consultas futuras para essa coluna retornam o resultado da função avaliada em vez do valor original da coluna. Isso é útil para um controle de acesso refinado, porque a função pode verificar a identidade do utilizador e as associações de grupo para decidir se deve ou não ocultar o valor.

Consulte Coluna mask cláusula.
table_constraint

Uma chave primária informativa opcional ou restrição de chave estrangeira na tabela.
TBLPROPERTIES

Uma lista opcional de propriedades da tabela .
WITH ROW FILTER clause (Visualização pública)

Adiciona uma função de filtro de linha à tabela. Consultas futuras para essa tabela recebem um subconjunto das linhas para as quais a função é avaliada como TRUE. Isso é útil para controle de acesso refinado, porque permite que a função inspecione a identidade e as associações de grupo do usuário que invoca para decidir se deseja filtrar determinadas linhas.

Ver ROW FILTER cláusula.
select_statement

Uma consulta DLT que define o conjunto de dados para a tabela.
CONSTRAINT cláusula
EXPECT expectation_name

Defina a restrição de qualidade de dados expectation_name. Se a restrição ON VIOLATION não estiver definida, adicione linhas que violem a restrição ao conjunto de dados de destino.
ON VIOLATION

Ação opcional a ser executada para linhas com falha:

- FAIL UPDATE: Pare imediatamente a execução do pipeline.
- DROP ROW: Largue o registo e continue a processar.

Alterar a captura de dados com SQL em DLT

Use a instrução APPLY CHANGES INTO para usar a funcionalidade DLT CDC, conforme descrito a seguir:

CREATE OR REFRESH STREAMING TABLE table_name;

APPLY CHANGES INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

Você define restrições de qualidade de dados para um destino APPLY CHANGES usando a mesma cláusula CONSTRAINT que consultas nãoAPPLY CHANGES. Veja Gerir a qualidade dos dados com as expectativas do pipeline.

Observação

O comportamento padrão para eventos INSERT e UPDATE é upsert eventos CDC da origem: atualizar quaisquer linhas na tabela de destino que correspondam à(s) chave(s) especificada(s) ou inserir uma nova linha quando um registro correspondente não existir na tabela de destino. A manipulação de eventos DELETE pode ser especificada com a condição APPLY AS DELETE WHEN.

Importante

Você deve declarar uma tabela de streaming de destino para aplicar as alterações. Opcionalmente, você pode especificar o esquema para sua tabela de destino. Ao especificar o esquema da tabela de destino APPLY CHANGES, você também deve incluir as colunas __START_AT e __END_AT com o mesmo tipo de dados que o campo sequence_by.

Consulte AS API DE APLICAÇÃO DE ALTERAÇÕES: Simplifique a captura de alterações de dados com DLT.

Cláusulas
KEYS

A coluna ou combinação de colunas que identifica exclusivamente uma linha nos dados de origem. Isso é usado para identificar quais eventos CDC se aplicam a registros específicos na tabela de destino.

Para definir uma combinação de colunas, use uma lista de colunas separadas por vírgula.

Esta cláusula é obrigatória.
IGNORE NULL UPDATES

Permitir a ingestão de atualizações contendo um subconjunto das colunas de destino. Quando um evento CDC corresponde a uma linha existente e IGNORE NULL UPDATES é especificado, as colunas com um null manterão seus valores existentes no destino. Isso também se aplica a colunas aninhadas com um valor de null.

Esta cláusula é facultativa.

O padrão é substituir colunas existentes por valores null.
APPLY AS DELETE WHEN

Especifica quando um evento CDC deve ser tratado como um DELETE em vez de um upsert. Para lidar com dados fora de ordem, a linha eliminada é temporariamente mantida como um marcador de exclusão na tabela Delta subjacente, e uma visão é criada no metastore que filtra esses marcadores de exclusão. O intervalo de retenção pode ser configurado com o
pipelines.cdc.tombstoneGCThresholdInSeconds propriedade de table.

Esta cláusula é facultativa.
APPLY AS TRUNCATE WHEN

Especifica quando um evento de CDC deve ser tratado como uma tabela completa TRUNCATE. Como essa cláusula aciona um truncado completo da tabela de destino, ela deve ser usada apenas para casos de uso específicos que exijam essa funcionalidade.

A cláusula APPLY AS TRUNCATE WHEN é suportada apenas para SCD tipo 1. SCD tipo 2 não suporta a operação de truncamento.

Esta cláusula é facultativa.
SEQUENCE BY

O nome da coluna que especifica a ordem lógica dos eventos CDC nos dados de origem. A DLT usa esse sequenciamento para lidar com eventos de alteração que chegam fora de ordem.

A coluna especificada deve ser um tipo de dados classificável.

Esta cláusula é obrigatória.
COLUMNS

Especifica um subconjunto de colunas a serem incluídas na tabela de destino. Pode optar por:

- Especifique a lista completa de colunas a incluir: COLUMNS (userId, name, city).
- Especifique uma lista de colunas a excluir: COLUMNS * EXCEPT (operation, sequenceNum)

Esta cláusula é facultativa.

O padrão é incluir todas as colunas na tabela de destino quando a cláusula COLUMNS não for especificada.
STORED AS

Se deseja armazenar registros como SCD tipo 1 ou SCD tipo 2.

Esta cláusula é facultativa.

O padrão é SCD tipo 1.
TRACK HISTORY ON

Especifica um subconjunto de colunas de saída para gerar registros de histórico quando houver alterações nessas colunas especificadas. Pode optar por:

- Especifique a lista completa de colunas a acompanhar: COLUMNS (userId, name, city).
- Especifique uma lista de colunas a serem excluídas do rastreamento: COLUMNS * EXCEPT (operation, sequenceNum)

Esta cláusula é facultativa. O padrão é controlar o histórico de todas as colunas de saída quando houver alterações, equivalente a TRACK HISTORY ON *.