Carregue e processe dados incrementalmente com fluxos Delta Live Tables
Este artigo explica o que são fluxos e como você pode usar fluxos em pipelines Delta Live Tables para processar dados incrementalmente de uma fonte para uma tabela de streaming de destino. No Delta Live Tables, os fluxos são definidos de duas maneiras:
- Um fluxo é definido automaticamente quando você cria uma consulta que atualiza uma tabela de streaming.
- Delta Live Tables também fornece funcionalidade para definir explicitamente fluxos para processamento mais complexo, como anexar a uma tabela de streaming de várias fontes de streaming.
Este artigo discute os fluxos implícitos que são criados quando você define uma consulta para atualizar uma tabela de streaming e, em seguida, fornece detalhes sobre a sintaxe para definir fluxos mais complexos.
O que é um fluxo?
No Delta Live Tables, um fluxo é uma consulta de streaming que processa dados de origem incrementalmente para atualizar uma tabela de streaming de destino. A maioria dos conjuntos de dados Delta Live Tables criados em um pipeline define o fluxo como parte da consulta e não requer a definição explícita do fluxo. Por exemplo, você cria uma tabela de streaming no Delta Live Tables em um único comando DDL em vez de usar instruções de tabela e fluxo separadas para criar a tabela de streaming:
Nota
Este CREATE FLOW
exemplo é fornecido apenas para fins ilustrativos e inclui palavras-chave que não são válidas sintaxe Delta Live Tables.
CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")
-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;
CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");
Além do fluxo padrão definido por uma consulta, as interfaces Python e SQL do Delta Live Tables fornecem a funcionalidade de fluxo de acréscimo. O fluxo de acréscimo suporta processamento que requer a leitura de dados de várias fontes de streaming para atualizar uma única tabela de streaming. Por exemplo, você pode usar a funcionalidade de fluxo de acréscimo quando tiver uma tabela e um fluxo de streaming existentes e quiser adicionar uma nova fonte de streaming que grave nessa tabela de streaming existente.
Use o fluxo de acréscimo para gravar em uma tabela de streaming a partir de vários fluxos de origem
Use o @append_flow
decorador na interface Python ou a CREATE FLOW
cláusula na interface SQL para gravar em uma tabela de streaming a partir de várias fontes de streaming. Use o fluxo de acréscimo para tarefas de processamento como as seguintes:
- Adicione fontes de streaming que acrescentam dados a uma tabela de streaming existente sem exigir uma atualização completa. Por exemplo, você pode ter uma tabela combinando dados regionais de cada região em que opera. À medida que novas regiões são distribuídas, você pode adicionar os novos dados de região à tabela sem executar uma atualização completa. Veja Exemplo: Gravar em uma tabela de streaming a partir de vários tópicos do Kafka.
- Atualize uma tabela de streaming anexando dados históricos ausentes (backfilling). Por exemplo, você tem uma tabela de streaming existente que é escrita por um tópico do Apache Kafka. Você também tem dados históricos armazenados em uma tabela que precisa ser inserida exatamente uma vez na tabela de streaming e não pode transmitir os dados porque seu processamento inclui a execução de uma agregação complexa antes de inserir os dados. Consulte Exemplo: Executar um backfill de dados único.
- Combine dados de várias fontes e grave em uma única tabela de streaming em vez de usar a
UNION
cláusula em uma consulta. Usar o processamento de fluxo de acréscimo em vez de permitir que você atualize a tabela deUNION
destino incrementalmente sem executar uma atualização de atualização completa. Consulte Exemplo: Use o processamento de fluxo de acréscimo em vez de UNION.
O destino para a saída de registros pelo processamento de fluxo de acréscimo pode ser uma tabela existente ou uma nova tabela. Para consultas Python, use a função create_streaming_table() para criar uma tabela de destino.
Importante
- Se você precisar definir restrições de qualidade de dados com expectativas, defina as expectativas na tabela de destino como parte da função ou em uma definição de
create_streaming_table()
tabela existente. Não é possível definir expectativas na@append_flow
definição. - Os fluxos são identificados por um nome de fluxo, e esse nome é usado para identificar pontos de verificação de streaming. O uso do nome do fluxo para identificar o ponto de verificação significa o seguinte:
- Se um fluxo existente em um pipeline for renomeado, o ponto de verificação não será transferido e o fluxo renomeado será efetivamente um fluxo totalmente novo.
- Não é possível reutilizar um nome de fluxo em um pipeline, porque o ponto de verificação existente não corresponderá à nova definição de fluxo.
A sintaxe é a seguinte:@append_flow
Python
import dlt
dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.
@dlt.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment>") # optional
def <function-name>():
return (<streaming query>)
SQL
CREATE OR REFRESH STREAMING TABLE append_target; -- Required only if the target table doesn't exist.
CREATE FLOW
flow_name
AS INSERT INTO
target_table BY NAME
SELECT * FROM
source;
Exemplo: Escrever em uma tabela de streaming a partir de vários tópicos do Kafka
Os exemplos a seguir criam uma tabela de streaming nomeada kafka_target
e grava nessa tabela de streaming a partir de dois tópicos do Kafka:
Python
import dlt
dlt.create_streaming_table("kafka_target")
# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)
@dlt.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_target;
CREATE FLOW
topic1
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');
CREATE FLOW
topic2
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');
Para saber mais sobre a função com valor de read_kafka()
tabela usada nas consultas SQL, consulte read_kafka na referência da linguagem SQL.
Exemplo: Executar um preenchimento de dados único
Os exemplos a seguir executam uma consulta para acrescentar dados históricos a uma tabela de streaming:
Nota
Para garantir um verdadeiro preenchimento único quando a consulta de preenchimento fizer parte de um pipeline executado de forma agendada ou contínua, remova a consulta depois de executar o pipeline uma vez. Para acrescentar novos dados se eles chegarem no diretório de preenchimento, deixe a consulta no lugar.
Python
import dlt
@dlt.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")
@dlt.append_flow(target = "csv_target")
def backfill():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")
SQL
CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
read_files(
"path/to/sourceDir",
"csv"
);
CREATE FLOW
backfill
AS INSERT INTO
csv_target BY NAME
SELECT * FROM
read_files(
"path/to/backfill/data/dir",
"csv"
);
Exemplo: Use o processamento de fluxo de acréscimo em vez de UNION
Em vez de usar uma consulta com uma UNION
cláusula, você pode usar consultas de fluxo de acréscimo para combinar várias fontes e gravar em uma única tabela de streaming. Usar consultas de fluxo de acréscimo em vez de UNION
permitir que você anexe a uma tabela de streaming de várias fontes sem executar uma atualização completa.
O exemplo Python a seguir inclui uma consulta que combina várias fontes de dados com uma UNION
cláusula:
@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
raw_orders_eu =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
return raw_orders_us.union(raw_orders_eu)
Os exemplos a seguir substituem a UNION
consulta por consultas de fluxo de acréscimo:
Python
dlt.create_streaming_table("raw_orders")
@dlt.append_flow(target="raw_orders")
def raw_oders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")
SQL
CREATE OR REFRESH STREAMING TABLE raw_orders;
CREATE FLOW
raw_orders_us
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
read_files(
"/path/to/orders/us",
"csv"
);
CREATE FLOW
raw_orders_eu
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
read_files(
"/path/to/orders/eu",
"csv"
);
-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
raw_orders_apac
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
read_files(
"/path/to/orders/apac",
"csv"
);