Partilhar via


Referência da linguagem DLT Python

Este artigo tem detalhes para a interface de programação DLT Python.

Para obter informações sobre a API SQL, consulte a referência de linguagem SQL DLT .

Para obter detalhes específicos para configurar o Auto Loader, consulte O que é o Auto Loader?.

Antes de começar

A seguir estão considerações importantes quando você implementa pipelines com a interface DLT Python:

  • Como as funções Python table() e view() são invocadas várias vezes durante o planejamento e a execução de uma atualização de pipeline, não inclua código em uma dessas funções que possa ter efeitos colaterais (por exemplo, código que modifica dados ou envia um e-mail). Para evitar um comportamento inesperado, suas funções Python que definem conjuntos de dados devem incluir apenas o código necessário para definir a tabela ou exibição.
  • Para realizar operações como o envio de e-mails ou a integração com um serviço de monitoramento externo, particularmente em funções que definem conjuntos de dados, use ganchos de eventos. A implementação dessas operações nas funções que definem seus conjuntos de dados causará um comportamento inesperado.
  • As funções Python table e view devem retornar um DataFrame. Algumas funções que operam em DataFrames não retornam DataFrames e não devem ser usadas. Essas operações incluem funções como collect(), count(), toPandas(), save()e saveAsTable(). Como as transformações de DataFrame são executadas após o gráfico de fluxo de dados completo ter sido resolvido, o uso dessas operações pode ter efeitos colaterais não intencionais.

Importar o módulo dlt Python

As funções DLT Python são definidas no módulo dlt. Seus pipelines implementados com a API Python devem importar este módulo:

import dlt

Criar uma vista materializada DLT ou uma tabela de streaming

Em Python, a DLT determina se um conjunto de dados deve ser atualizado como uma exibição materializada ou uma tabela de streaming com base na consulta definidora. O decorador @table pode ser usado para definir tanto vistas materializadas como tabelas de fluxo contínuo.

Para definir uma exibição materializada em Python, aplique @table a uma consulta que executa uma leitura estática em relação a uma fonte de dados. Para definir uma tabela de streaming, aplique @table a uma consulta que execute uma leitura de streaming em relação a uma fonte de dados ou use a função create_streaming_table(). Ambos os tipos de conjunto de dados têm a mesma especificação de sintaxe da seguinte maneira:

import dlt

@dlt.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  schema="schema-definition",
  row_filter = "row-filter-clause",
  temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Criar uma vista DLT

Para definir uma vista em Python, aplique o @view decorador. Como o decorador de @table, você pode usar visualizações em DLT para conjuntos de dados estáticos ou de streaming. A sintaxe a seguir está para definir modos de exibição com Python:

import dlt

@dlt.view(
  name="<name>",
  comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Exemplo: Definir tabelas e modos de exibição

Para definir uma tabela ou vista em Python, aplique o decorador @dlt.view ou @dlt.table a uma função. Você pode usar o nome da função ou o parâmetro name para atribuir o nome da tabela ou da exibição. O exemplo a seguir define dois conjuntos de dados diferentes: um modo de exibição chamado taxi_raw que usa um arquivo JSON como fonte de entrada e uma tabela chamada filtered_data que usa a exibição taxi_raw como entrada:

import dlt

@dlt.view
def taxi_raw():
  return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")

# Use the function name as the table name
@dlt.table
def filtered_data():
  return spark.read.table("taxi_raw").where(...)

# Use the name parameter as the table name
@dlt.table(
  name="filtered_data")
def create_filtered_data():
  return spark.read.table("taxi_raw").where(...)

Exemplo: acessar um conjunto de dados definido no mesmo pipeline

Observação

Embora as funções dlt.read() e dlt.read_stream() ainda estejam disponíveis e sejam totalmente suportadas pela interface DLT Python, o Databricks recomenda sempre usar as funções spark.read.table() e spark.readStream.table() devido ao seguinte:

  • As funções spark suportam a leitura de conjuntos de dados internos e externos, incluindo conjuntos de dados em armazenamento externo ou definidos em outros pipelines. As funções dlt suportam apenas a leitura de conjuntos de dados internos.
  • As funções spark suportam a especificação de opções, como skipChangeCommits, para operações de leitura. A especificação de opções não é suportada pelas funções dlt.

Para acessar um conjunto de dados definido no mesmo pipeline, use as funções spark.read.table() ou spark.readStream.table():

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredA():
  return spark.read.table("customers_raw").where(...)

Observação

Ao consultar exibições ou tabelas em seu pipeline, você pode especificar o catálogo e o esquema diretamente ou pode usar os padrões configurados em seu pipeline. Neste exemplo, a tabela customersé escrita e lida a partir do catálogo e esquema padrão configurados para seu pipeline.

Exemplo: Ler a partir de uma tabela registada num metastore

Para ler dados de uma tabela registrada no metastore do Hive, no argumento da função, você pode qualificar o nome da tabela com o nome do banco de dados:

@dlt.table
def customers():
  return spark.read.table("sales.customers").where(...)

Para obter um exemplo de leitura de uma tabela do Catálogo Unity, consulte Ingestão de dados em um pipeline do Catálogo Unity.

Exemplo: acessar um conjunto de dados usando spark.sql

Você também pode retornar um conjunto de dados usando uma expressão spark.sql em uma função de consulta. Para ler a partir de um conjunto de dados interno, podes deixar o nome não qualificado para usar o catálogo e o esquema predefinidos ou podes antepô-los.

@dlt.table
def chicago_customers():
  return spark.sql("SELECT * FROM catalog_name.schema_name.customers_cleaned WHERE city = 'Chicago'")

Excluir permanentemente registos de uma vista materializada ou tabela de streaming

Para excluir permanentemente registros de uma exibição materializada ou tabela de streaming com vetores de exclusão habilitados, como para conformidade com o GDPR, operações adicionais devem ser executadas nas tabelas Delta subjacentes do objeto. Para garantir a exclusão de registros de uma exibição materializada, consulte Excluir permanentemente registros de uma exibição materializada com vetores de exclusão habilitados. Para garantir a exclusão de registros de uma tabela de streaming, consulte Excluir permanentemente registros de uma tabela de streaming.

Gravar em serviços de streaming de eventos externos ou tabelas Delta com a API DLT de sink

Importante

A API de sink DLT está em visualização pública .

Observação

  • A execução de um de atualização completa do não limpa os dados dos coletores. Todos os dados reprocessados serão anexados ao coletor e os dados existentes não serão alterados.
  • As expectativas de DLT não são suportadas com a API sink.

Para escrever num serviço de transmissão contínua de eventos, como Apache Kafka ou Azure Event Hubs, ou numa tabela Delta a partir de um pipeline DLT, use a função create_sink() incluída no módulo dlt Python. Depois de criar um sink com a função create_sink(), utilize-o num fluxo de acréscimo para escrever dados nele. O fluxo de anexação é o único tipo de fluxo suportado com a função create_sink(). Não há suporte para outros tipos de fluxo, como apply_changes, .

A sintaxe para criar um coletor com a função create_sink() segue:

create_sink(<sink_name>, <format>, <options>)
Argumentos
name

Tipo: str

Uma cadeia de caracteres que identifica o coletor e é usada para fazer referência e gerenciar o coletor. Os nomes dos coletores devem ser exclusivos para o pipeline, inclusive em todo o código-fonte, como blocos de anotações ou módulos que fazem parte do pipeline.

Este parâmetro é obrigatório.
format

Tipo: str

Uma cadeia de caracteres que define o formato de saída, kafka ou delta.

Este parâmetro é obrigatório.
options

Tipo: dict

Uma lista opcional de opções de sink, formatada como {"key": "value"}, onde a chave e o valor são ambos cadeias de caracteres. Todas as opções de tempo de execução do Databricks suportadas pelos coletores Kafka e Delta são suportadas. Para opções de Kafka, consulte Configurar o gravador de Kafka Structured Streaming. Para opções Delta, consulte tabela Delta como um coletor.

Exemplo: Criar um lavatório Kafka com a função create_sink()

create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

Exemplo: Criar um coletor Delta com a função create_sink() e um caminho do sistema de arquivos

O exemplo a seguir cria um coletor que grava em uma tabela Delta passando o caminho do sistema de arquivos para a tabela:

create_sink(
  "my_delta_sink",
    "delta",
    { "path": "//path/to/my/delta/table" }
)

Exemplo: Criar um coletor Delta com a função create_sink() e um nome de tabela do Unity Catalog

Observação

O Delta sink suporta tabelas externas e geridas do Unity Catalog e tabelas geridas pelo metastore do Hive. Os nomes das tabelas devem ser totalmente qualificados. Por exemplo, as tabelas do Catálogo Unity devem usar um identificador de três camadas: <catalog>.<schema>.<table>. As tabelas de metastore do Hive devem usar <schema>.<table>.

O exemplo a seguir cria um coletor que grava em uma tabela Delta passando o nome de uma tabela no Unity Catalog:

create_sink(
  "my_delta_sink",
    "delta",
    { "tableName": "my_catalog.my_schema.my_table" }
)

Exemplo: Use um fluxo de acréscimo para gravar em um coletor Delta

O exemplo a seguir cria um coletor que grava em uma tabela Delta e, em seguida, cria um fluxo de acréscimo para gravar nesse coletor:

create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})

@append_flow(name = "flow", target = "my_sink")
def flowFunc():
  return <streaming-query>

Exemplo: Use um fluxo de anexação para gravar em um coletor de Kafka

O exemplo a seguir cria um sink que grava num tópico Kafka e, em seguida, cria um fluxo de adição para gravar nesse sink:

create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

@append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
  return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))

O esquema do DataFrame gravado em Kafka deve incluir as colunas especificadas em Configure the Kafka Structured Streaming writer.

Crie uma tabela para usar como destino de operações de streaming

Use a função create_streaming_table() para criar uma tabela de destino para a saída de registos por operações de streaming, incluindo apply_changes(), apply_changes_from_snapshot()e registos de saída @append_flow.

Observação

As funções create_target_table() e create_streaming_live_table() foram preteridas. O Databricks recomenda atualizar o código existente para usar a função create_streaming_table().

create_streaming_table(
  name = "<table-name>",
  comment = "<comment>",
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  path="<storage-location-path>",
  schema="schema-definition",
  expect_all = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
  row_filter = "row-filter-clause"
)
Argumentos
name

Tipo: str

O nome da tabela.

Este parâmetro é obrigatório.
comment

Tipo: str

Uma descrição opcional para a tabela.
spark_conf

Tipo: dict

Uma lista opcional de configurações do Spark para a execução desta consulta.
table_properties

Tipo: dict

Uma lista opcional de propriedades da tabela.
partition_cols

Tipo: array

Uma lista opcional de uma ou mais colunas a serem usadas para particionar a tabela.
cluster_by

Tipo: array

Opcionalmente, habilite o clustering líquido na tabela e defina as colunas a serem usadas como chaves de clustering.

Consulte Utilize a clusterização líquida para tabelas Delta.
path

Tipo: str

Um local de armazenamento opcional para dados de tabela. Se não estiver definido, o sistema usará por defeito o local de armazenamento do pipeline.
schema

Tipo: str ou StructType

Uma definição de esquema opcional para a tabela. Os esquemas podem ser definidos como uma cadeia de caracteres DDL SQL ou com um Python
StructType.
expect_all
expect_all_or_drop
expect_all_or_fail

Tipo: dict

Restrições opcionais de qualidade de dados para a tabela. Consulte múltiplas expectativas.
row_filter (Visualização pública)

Tipo: str

Uma cláusula de filtro de linha opcional para a tabela. Consulte Publicar tabelas com filtros de linha e máscaras de coluna.

Controlar como as tabelas são materializadas

As tabelas também oferecem um controlo adicional da sua materialização:

  • Especifique como tabelas de de cluster usando cluster_by. Você pode usar o clustering líquido para acelerar as consultas. Consulte Usar agrupamento líquido para tabelas Delta.
  • Especifique como as tabelas são particionadas usando partition_cols.
  • Você pode definir as propriedades da tabela ao definir um modo de exibição ou tabela. Consulte as propriedades da tabela DLT .
  • Defina um local de armazenamento para os dados da tabela usando a configuração path. Por padrão, os dados da tabela são armazenados no local de armazenamento do pipeline se path não estiver definido.
  • Você pode usar colunas geradas em sua definição de esquema. Consulte Exemplo: Especifique um esquema e colunas de agrupamento.

Observação

Para tabelas com menos de 1 TB de tamanho, o Databricks recomenda permitir que a DLT controle a organização dos dados. Você não deve especificar colunas de partição, a menos que espere que sua tabela cresça além de um terabyte.

Exemplo: Especificar um esquema e colunas em cluster

Opcionalmente, você pode especificar um esquema de tabela usando um StructType Python ou uma cadeia de caracteres DDL SQL. Quando especificado com uma string DDL, a definição pode incluir colunas geradas.

O exemplo a seguir cria uma tabela chamada sales com um esquema especificado usando um Python StructType:

sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)

@dlt.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

O exemplo a seguir especifica o esquema para uma tabela usando uma cadeia de caracteres DDL, define uma coluna gerada e define colunas de clustering:

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  cluster_by = ["order_day_of_week", "customer_id"])
def sales():
  return ("...")

Por padrão, a DLT infere o esquema da definição de table se você não especificar um esquema.

Exemplo: Especificar colunas de partição

O exemplo a seguir especifica o esquema para uma tabela usando uma cadeia de caracteres DDL, define uma coluna gerada e define uma coluna de partição:

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

Exemplo: Definir restrições de tabela

Importante

As restrições de tabela estão em visualização pública .

Ao especificar um esquema, você pode definir chaves primárias e estrangeiras. As restrições são informativas e não são aplicadas. Consulte a cláusula CONSTRAINT na referência da linguagem SQL.

O exemplo a seguir define uma tabela com uma restrição de chave primária e estrangeira:

@dlt.table(
   schema="""
    customer_id STRING NOT NULL PRIMARY KEY,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
    CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
    """
def sales():
   return ("...")

Exemplo: Definir um filtro de linha e uma máscara de coluna

Importante

Os filtros de linha e as máscaras de coluna estão em Pré-visualização Pública .

Para criar uma exibição materializada ou uma tabela de streaming com um filtro de linha e uma máscara de coluna, use a cláusula ROW FILTER e a cláusula MASK. O exemplo a seguir demonstra como definir um modo de exibição materializado e uma tabela Streaming com um filtro de linha e uma máscara de coluna:

@dlt.table(
   schema="""
    id int COMMENT 'This is the customer ID',
    name string COMMENT 'This is the customer full name',
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
    """,
  row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
   return ("...")

Para obter mais informações sobre filtros de linha e máscaras de coluna, consulte Publicar tabelas com filtros de linha e máscaras de coluna.

Configurar uma tabela de streaming para ignorar alterações em uma tabela de streaming de origem

Observação

  • O sinalizador skipChangeCommits funciona apenas com spark.readStream usando a função option(). Não é possível usar esse sinalizador em uma função dlt.read_stream().
  • Não é possível usar o flag skipChangeCommits quando a tabela de streaming de origem é definida como o destino de uma função apply_changes().

Por padrão, as tabelas de streaming exigem fontes somente de acréscimo. Quando uma tabela de streaming utiliza outra tabela de streaming como fonte e a tabela de streaming de origem requer atualizações ou eliminações, por exemplo, para o processamento do "direito a ser esquecido" segundo o GDPR, o sinalizador skipChangeCommits pode ser ativado durante a leitura da tabela de streaming de origem para ignorar essas alterações. Para obter mais informações sobre esse sinalizador, consulte Ignorar atualizações e exclui.

@table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("A")

Propriedades do Python DLT

As tabelas a seguir descrevem as opções e propriedades que você pode especificar ao definir tabelas e exibições com DLT:

@table ou @view
name

Tipo: str

Um nome opcional para a tabela ou exibição. Se não estiver definido, o nome da função será usado como o nome da tabela ou do modo de exibição.
comment

Tipo: str

Uma descrição opcional para a tabela.
spark_conf

Tipo: dict

Uma lista opcional de configurações do Spark para a execução desta consulta.
table_properties

Tipo: dict

Uma lista opcional de propriedades da tabela.
path

Tipo: str

Um local de armazenamento opcional para dados de tabela. Se não estiver definido, o sistema define por defeito o local de armazenamento do pipeline.
partition_cols

Tipo: a collection of str

Uma coleção opcional, por exemplo, uma list de uma ou mais colunas a serem usadas para particionar a tabela.
cluster_by

Tipo: array

Opcionalmente, ative o agrupamento líquido na tabela e defina as colunas a serem usadas como chaves de agrupamento.

Consulte Utilizar clusterização líquida para tabelas Delta.
schema

Tipo: str ou StructType

Uma definição de esquema opcional para a tabela. Os esquemas podem ser definidos como uma cadeia de caracteres DDL SQL ou com Python StructType.
temporary

Tipo: bool

Crie uma tabela, mas não publique metadados para a tabela. A palavra-chave temporary instrui a DLT a criar uma tabela que está disponível para o pipeline, mas não deve ser acessada fora do pipeline. Para reduzir o tempo de processamento, uma tabela temporária persiste durante o tempo de vida do pipeline que a cria, e não apenas uma única atualização.

O padrão é 'False'.
row_filter (Visualização pública)

Tipo: str

Uma cláusula de filtro de linha opcional para a tabela. Consulte Publicar tabelas com filtros de linha e máscaras de coluna.
Definição de tabela ou visualização
def <function-name>()

Uma função Python que define o conjunto de dados. Se o parâmetro name não estiver definido, <function-name> será usado como o nome do conjunto de dados de destino.
query

Uma instrução Spark SQL que retorna um Spark Dataset ou Koalas DataFrame.

Use dlt.read() ou spark.read.table() para executar uma leitura completa de um conjunto de dados definido no mesmo pipeline. Para ler um conjunto de dados externo, use a função spark.read.table(). Não é possível usar dlt.read() para ler conjuntos de dados externos. Como spark.read.table() pode ser usado para ler conjuntos de dados internos, conjuntos de dados definidos fora do pipeline atual e permite especificar opções para ler dados, o Databricks recomenda usá-lo em vez da função dlt.read().

Quando você define um conjunto de dados em um pipeline, por padrão, ele usará o catálogo e o esquema definidos na configuração do pipeline. Você pode usar a função spark.read.table() para ler de um conjunto de dados definido no pipeline sem qualquer qualificação. Por exemplo, para ler a partir de um conjunto de dados chamado customers:

spark.read.table("customers")

Você também pode usar a função spark.read.table() para ler uma tabela registrada no metastore qualificando opcionalmente o nome da tabela com o nome do banco de dados:

spark.read.table("sales.customers")

Use dlt.read_stream() ou spark.readStream.table() para executar uma leitura de streaming de um conjunto de dados definido no mesmo pipeline. Para executar uma leitura de streaming de um conjunto de dados externo, use o botão
função spark.readStream.table() Como spark.readStream.table() pode ser usado para ler conjuntos de dados internos, conjuntos de dados definidos fora do pipeline atual e permite especificar opções para ler dados, o Databricks recomenda usá-lo em vez da função dlt.read_stream().

Para definir uma consulta em uma função de table DLT usando sintaxe SQL, use a função spark.sql. Consulte Exemplo: acessar um conjunto de dados usando spark.sql. Para definir uma consulta numa função DLT table usando a sintaxe PySpark em Python, use.
Expectativas
@expect("description", "constraint")

Declarar uma restrição de qualidade de dados identificada por
description. Se uma linha violar a expectativa, inclua a linha no conjunto de dados de destino.
@expect_or_drop("description", "constraint")

Declarar uma restrição de qualidade de dados identificada por
description. Se uma linha violar a expectativa, solte a linha do conjunto de dados de destino.
@expect_or_fail("description", "constraint")

Declarar uma restrição de qualidade de dados identificada por
description. Se uma linha violar a expectativa, interrompa imediatamente a execução.
@expect_all(expectations)

Declare uma ou mais restrições de qualidade de dados.
expectations é um dicionário Python, onde a chave é a descrição da expectativa e o valor é a restrição de expectativa. Se uma linha violar qualquer uma das expectativas, inclua a linha no conjunto de dados de destino.
@expect_all_or_drop(expectations)

Declare uma ou mais restrições de qualidade de dados.
expectations é um dicionário Python, onde a chave é a descrição da expectativa e o valor é a restrição de expectativa. Se uma linha violar qualquer uma das expectativas, solte a linha do conjunto de dados de destino.
@expect_all_or_fail(expectations)

Declare uma ou mais restrições de qualidade de dados.
expectations é um dicionário Python, onde a chave é a descrição da expectativa e o valor é a restrição de expectativa. Se uma linha violar quaisquer das expectativas, interrompa a execução imediatamente.

Captura de alterações em dados a partir de um fluxo de alterações com Python em DLT

Utilize a função apply_changes() na API Python para aplicar a funcionalidade de captura de dados de alteração (CDC) do DLT visando processar os dados de origem de um feed de dados de alterações (CDF).

Importante

Você deve declarar uma tabela de streaming de destino para aplicar as alterações. Opcionalmente, você pode especificar o esquema para sua tabela de destino. Ao especificar o esquema da tabela de destino apply_changes(), você deve incluir as colunas __START_AT e __END_AT com o mesmo tipo de dados que os campos sequence_by.

Para criar a tabela de destino necessária, você pode usar a função create_streaming_table() na interface DLT Python.

apply_changes(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = False,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

Observação

Para processamento APPLY CHANGES, o comportamento padrão para eventos INSERT e UPDATE é upsert eventos CDC da origem: atualizar quaisquer linhas na tabela de destino que correspondam à(s) chave(s) especificada(s) ou inserir uma nova linha quando um registro correspondente não existir na tabela de destino. A manipulação de eventos DELETE pode ser especificada com a condição APPLY AS DELETE WHEN.

Para saber mais sobre o processamento CDC com um fluxo de alterações, consulte as APIs APLICAR ALTERAÇÕES: Simplifique a captura de dados de alterações com DLT. Para obter um exemplo de como usar a função apply_changes(), consulte Exemplo: processamento de SCD tipo 1 e SCD tipo 2 com dados de origem CDF.

Importante

Você deve declarar uma tabela de streaming de destino para aplicar as alterações. Opcionalmente, você pode especificar o esquema para sua tabela de destino. Ao especificar o esquema da tabela de destino apply_changes, você deve incluir as colunas __START_AT e __END_AT com o mesmo tipo de dados que o campo sequence_by.

Consulte As APIs APPLY CHANGES: Simplifique a captura de dados de alteração com DLT.

Argumentos
target

Tipo: str

O nome da tabela que deve ser atualizado. Você pode usar a função create_streaming_table() para criar a tabela de destino antes de executar a função apply_changes().

Este parâmetro é obrigatório.
source

Tipo: str

A fonte de dados que contém os registros do CDC.

Este parâmetro é obrigatório.
keys

Tipo: list

A coluna ou combinação de colunas que identifica exclusivamente uma linha nos dados de origem. Isso é usado para identificar quais eventos CDC se aplicam a registros específicos na tabela de destino.

Você pode especificar:

- Uma lista de sequências de caracteres: ["userId", "orderId"]
- Uma lista de funções do Spark SQL col(): [col("userId"), col("orderId"]

Os argumentos de funções col() não podem incluir qualificadores. Por exemplo, pode usar col(userId), mas não pode usar col(source.userId).

Este parâmetro é obrigatório.
sequence_by

Tipo: str ou col()

O nome da coluna que especifica a ordem lógica dos eventos CDC nos dados de origem. A DLT usa esse sequenciamento para lidar com eventos de alteração que chegam fora de ordem.

Você pode especificar:

- Uma corda: "sequenceNum"
- Uma função Spark SQL col(): col("sequenceNum")

Os argumentos para funções col() não devem incluir qualificadores. Por exemplo, pode usar col(userId), mas não pode usar col(source.userId).

A coluna especificada deve ser um tipo de dados classificável.

Este parâmetro é obrigatório.
ignore_null_updates

Tipo: bool

Permitir a ingestão de atualizações contendo um subconjunto das colunas alvo. Quando um evento CDC corresponde a uma linha existente e ignore_null_updates é True, as colunas com um null retêm seus valores existentes no destino. Isso também se aplica a colunas aninhadas com um valor de null. Quando ignore_null_updates é False, os valores existentes são substituídos por valores null.

Este parâmetro é opcional.

O padrão é False.
apply_as_deletes

Tipo: str ou expr()

Especifica quando um evento CDC deve ser tratado como um DELETE em vez de um upsert. Para lidar com dados fora de ordem, a linha excluída é temporariamente mantida como um marcador na tabela Delta subjacente, e uma vista é criada no metastore que filtra estes marcadores. O intervalo de retenção pode ser configurado com o
pipelines.cdc.tombstoneGCThresholdInSeconds propriedade da tabela.

Você pode especificar:

- Uma cadeia de caracteres: "Operation = 'DELETE'"
- Uma função Spark SQL expr(): expr("Operation = 'DELETE'")

Este parâmetro é opcional.
apply_as_truncates

Tipo: str ou expr()

Especifica quando um evento CDC deve ser tratado como uma tabela completa TRUNCATE. Como essa cláusula aciona um truncado completo da tabela de destino, ela deve ser usada apenas para casos de uso específicos que exijam essa funcionalidade.

O parâmetro apply_as_truncates é suportado apenas para SCD tipo 1. ** SCD tipo 2 não suporta operações de truncagem.

Você pode especificar:

- Uma corda: "Operation = 'TRUNCATE'"
- Uma função Spark SQL expr(): expr("Operation = 'TRUNCATE'")

Este parâmetro é opcional.
column_list

except_column_list

Tipo: list

Um subconjunto de colunas a serem incluídas na tabela de destino. Use column_list para especificar a lista completa de colunas a serem incluídas. Use except_column_list para especificar as colunas a serem excluídas. Você pode declarar qualquer valor como uma lista de cadeias de caracteres ou como funções de col() SQL do Spark:

- column_list = ["userId", "name", "city"].
- column_list = [col("userId"), col("name"), col("city")]
- except_column_list = ["operation", "sequenceNum"]
- except_column_list = [col("operation"), col("sequenceNum")

Os argumentos para as funções col() não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId).

Este parâmetro é opcional.

O padrão é incluir todas as colunas na tabela de destino quando nenhum argumento column_list ou except_column_list é passado para a função.
stored_as_scd_type

Tipo: str ou int

Se deseja armazenar registros como SCD tipo 1 ou SCD tipo 2.

Defina como 1 para SCD tipo 1 ou 2 para SCD tipo 2.

Esta cláusula é facultativa.

O padrão é SCD tipo 1.
track_history_column_list

track_history_except_column_list

Tipo: list

Um subconjunto de colunas de saída a serem acompanhadas para o histórico na tabela de destino. Use track_history_column_list para especificar a lista completa de colunas a serem rastreadas. Utilização
track_history_except_column_list para especificar as colunas a serem excluídas do acompanhamento. Você pode declarar qualquer valor como uma lista de cadeias de caracteres ou como funções de col() SQL do Spark:
- track_history_column_list = ["userId", "name", "city"].
- track_history_column_list = [col("userId"), col("name"), col("city")]
- track_history_except_column_list = ["operation", "sequenceNum"]
- track_history_except_column_list = [col("operation"), col("sequenceNum")

Os argumentos das funções col() não podem incluir qualificadores. Por exemplo, podes usar col(userId), mas não podes usar col(source.userId).

Este parâmetro é opcional.

O padrão é incluir todas as colunas na tabela de destino quando nenhum track_history_column_list ou
O argumento track_history_except_column_list é passado para a função.

Alterar a captura de dados de snapshots de banco de dados com Python em DLT

Importante

A API APPLY CHANGES FROM SNAPSHOT está em Pré-visualização Pública .

Use a função apply_changes_from_snapshot() na API Python para aplicar a funcionalidade de captura de dados de alteração DLT (CDC) no processamento de dados de origem a partir de snapshots de base de dados.

Importante

Você deve declarar uma tabela de streaming de destino para aplicar as alterações. Opcionalmente, você pode especificar o esquema para sua tabela de destino. Ao especificar o esquema da tabela de destino apply_changes_from_snapshot(), você também deve incluir as colunas __START_AT e __END_AT com o mesmo tipo de dados que o campo sequence_by.

Para criar a tabela de destino necessária, você pode usar a função create_streaming_table() na interface DLT Python.

apply_changes_from_snapshot(
  target = "<target-table>",
  source = Any,
  keys = ["key1", "key2", "keyN"],
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
) -> None

Observação

Para processamento APPLY CHANGES FROM SNAPSHOT, o comportamento padrão é inserir uma nova linha quando um registro correspondente com a(s) mesma(s) chave(s) não existe no destino. Se existir um registro correspondente, ele será atualizado somente se algum dos valores na linha tiver sido alterado. As linhas com chaves presentes no destino, mas não mais presentes na origem, são excluídas.

Para saber mais sobre o processamento de captura de dados de alterações (CDC) com snapshots, consulte As APIs APPLY CHANGES: Simplificam a captura de dados de alterações com DLT. Para obter exemplos de como usar a função apply_changes_from_snapshot(), consulte os exemplos de ingestão periódica de instantâneos e de ingestão de instantâneos históricos .

Argumentos
target

Tipo: str

O nome da tabela a ser atualizada. Você pode usar a função create_streaming_table() para criar a tabela de destino antes de executar a função apply_changes().

Este parâmetro é obrigatório.
source

Tipo: str ou lambda function

O nome de uma tabela ou visualização para capturar periodicamente um instantâneo, ou uma função lambda Python que retorna o DataFrame do instantâneo a ser processado e a versão do instantâneo. Consulte Implementar o argumento source.

Este parâmetro é obrigatório.
keys

Tipo: list

A coluna ou combinação de colunas que identifica exclusivamente uma linha nos dados de origem. Isso é usado para identificar quais eventos CDC se aplicam a registros específicos na tabela de destino.

Você pode especificar:

- Uma lista de cadeias de texto: ["userId", "orderId"]
- Uma lista de funções do Spark SQL col(): [col("userId"), col("orderId"]

Os argumentos de funções col() não podem incluir qualificadores. Por exemplo, pode-se usar col(userId), mas não se pode usar col(source.userId).

Este parâmetro é obrigatório.
stored_as_scd_type

Tipo: str ou int

Se deseja armazenar registros como SCD tipo 1 ou SCD tipo 2.

Defina como 1 para SCD tipo 1 ou 2 para SCD tipo 2.

Esta cláusula é facultativa.

O padrão é SCD tipo 1.
track_history_column_list

track_history_except_column_list

Tipo: list

Um subconjunto de colunas de saída a serem monitorizadas para seguimento na tabela de destino. Use track_history_column_list para especificar a lista completa de colunas a serem rastreadas. Utilização
track_history_except_column_list especificar as colunas a serem excluídas do acompanhamento. Você pode declarar qualquer valor como uma lista de cadeias de caracteres ou como funções de col() SQL do Spark:
- track_history_column_list = ["userId", "name", "city"].
- track_history_column_list = [col("userId"), col("name"), col("city")]
- track_history_except_column_list = ["operation", "sequenceNum"]
- track_history_except_column_list = [col("operation"), col("sequenceNum")

Os argumentos de funções col() não podem incluir qualificadores. Por exemplo, pode usar col(userId), mas não pode usar col(source.userId).

Este parâmetro é opcional.

O padrão é incluir todas as colunas na tabela de destino quando nenhum track_history_column_list ou
O argumento track_history_except_column_list é passado para a função.

Implementar o argumento source

A função apply_changes_from_snapshot() inclui o argumento source. Para processar snapshots históricos, espera-se que o argumento source seja uma função lambda Python que retorna dois valores para a função apply_changes_from_snapshot(): um Python DataFrame contendo os dados de snapshot a serem processados e uma versão de snapshot.

A assinatura da função lambda é a seguinte:

lambda Any => Optional[(DataFrame, Any)]
  • O argumento para a função lambda é a versão de snapshot processada mais recentemente.
  • O valor de retorno da função lambda é None ou uma tupla de dois valores: O primeiro valor da tupla é um DataFrame contendo o instantâneo a ser processado. O segundo valor da tupla é a versão do instantâneo que representa a ordem lógica desse instantâneo.

Um exemplo que implementa e chama a função lambda:

def next_snapshot_and_version(latest_snapshot_version):
 if latest_snapshot_version is None:
   return (spark.read.load("filename.csv"), 1)
 else:
   return None

apply_changes_from_snapshot(
  # ...
  source = next_snapshot_and_version,
  # ...
)

O ambiente de execução DLT executa as seguintes etapas cada vez que o pipeline contendo a função apply_changes_from_snapshot() é desencadeado:

  1. Executa a função next_snapshot_and_version para carregar o próximo snapshot DataFrame e a versão de snapshot correspondente.
  2. Se nenhum DataFrame retornar, a execução será encerrada e a atualização do pipeline será marcada como concluída.
  3. Deteta as alterações no novo instantâneo e as aplica incrementalmente à tabela de destino.
  4. Retorna à etapa #1 para carregar o próximo instantâneo e sua versão.

Limitações

A interface DLT Python tem a seguinte limitação:

A função pivot() não é suportada. A operação pivot no Spark requer o carregamento ansioso de dados de entrada para calcular o esquema de saída. Esta capacidade não é suportada na DLT.