Загрузка и обработка данных постепенно с помощью потоков DLT
В этой статье объясняется, что такое потоки, а также как можно использовать потоки внутри конвейеров DLT для поэтапной обработки данных из источника в целевой потоковой таблице. В DLT потоки определяются двумя способами:
- Поток определяется автоматически при создании запроса, обновляющего потоковую таблицу.
- DLT также предоставляет функции для явного задания потоков для более сложной обработки, например добавления в потоковую таблицу из нескольких потоковых источников.
В этой статье рассматриваются неявные потоки, созданные при определении запроса для обновления таблицы потоковой передачи, а затем содержатся сведения о синтаксисе для определения более сложных потоков.
Что такое поток?
В DLT поток с обозначением представляет собой потоковый запрос, обрабатывающий исходные данные инкрементально для обновления целевой потоковой таблицы. Большинство наборов данных DLT, создаваемых в конвейере, определяют поток как часть запроса и не требуют явного определения потока. Например, вы создаете потоковую таблицу в DLT с помощью одной команды DDL вместо использования отдельных операторов создания таблиц и потоков для создания потоковой таблицы.
Заметка
Этот CREATE FLOW
пример предоставляется только для иллюстрирующих целей и включает ключевые слова, которые не являются допустимым синтаксисом DLT.
CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")
-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;
CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");
Помимо потока по умолчанию, определенного запросом, интерфейсы DLT Python и SQL предоставляют функциональность добавления потока . Поток добавления поддерживает обработку, требующую чтения данных из нескольких источников потоковой передачи для обновления одной потоковой таблицы. Например, вы можете использовать функциональность добавления к потоку, если у вас есть существующий поток и потоковая таблица и требуется добавить новый источник потоковой передачи, который записывается в эту существующую потоковую таблицу.
Использование потока добавления для записи в таблицу потоковой передачи из нескольких исходных потоков
Используйте декоратор @append_flow
в интерфейсе Python или предложение CREATE FLOW
в интерфейсе SQL для записи в потоковую таблицу из нескольких потоковых источников. Используйте поток добавления для задач обработки, таких как:
- Добавьте источники потоковой передачи, добавляющие данные в существующую потоковую таблицу без полного обновления. Например, у вас может быть таблица, объединяющая региональные данные из каждого региона, в который вы работаете. При развертывании новых регионов можно добавить новые данные региона в таблицу без полного обновления. См. пример : запись в потоковую таблицу из нескольких топиков Kafka.
- Обновите потоковую таблицу данных, добавив отсутствующие исторические данные (дозаполнение). Например, у вас есть существующая стриминговая таблица, в которую записывается тема Apache Kafka. У вас также есть исторические данные, хранящиеся в таблице, которые нужно вставить ровно один раз в потоковую таблицу, и вы не можете выполнять потоковую обработку данных, так как ваша обработка включает выполнение сложной агрегации перед вставкой данных. См. Пример: Запуск однократной обратной записи данных.
- Объедините данные из нескольких источников и запишите в одну потоковую таблицу вместо использования выражения
UNION
в запросе. Использование потоковой обработки добавлений вместоUNION
позволяет обновлять целевую таблицу поэтапно без выполнения полного обновления. См. пример : используйте обработку потока добавления вместоUNION
.
Целевым объектом для записи выходных данных при обработке потока добавления может быть существующая таблица или новая таблица. Для запросов Python используйте функцию create_streaming_table() для создания целевой таблицы.
Важный
- Если вам нужно задать ограничения качества данных с помощью ожиданий , определите ожидания в целевой таблице как часть функции
create_streaming_table()
или в определении существующей таблицы. Невозможно определить ожидания в определении@append_flow
. - Потоки определяются именем потока, и это имя используется для идентификации контрольных точек потоковой передачи. Использование имени потока для идентификации контрольной точки означает следующее:
- Если существующий поток в конвейере переименован, контрольная точка не переносится, и переименованный поток фактически является совершенно новым потоком.
- Невозможно повторно использовать имя потока в конвейере, так как существующая контрольная точка не будет соответствовать новому определению потока.
Ниже приведен синтаксис для @append_flow
:
Питон
import dlt
dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.
@dlt.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment>") # optional
def <function-name>():
return (<streaming query>)
SQL
CREATE OR REFRESH STREAMING TABLE append_target; -- Required only if the target table doesn't exist.
CREATE FLOW
flow_name
AS INSERT INTO
target_table BY NAME
SELECT * FROM
source;
Пример: Запись в потоковую таблицу из нескольких топиков Kafka
В следующих примерах создается таблица потоковой передачи с именем kafka_target
и записывается в ту таблицу потоковой передачи из двух разделов Kafka:
Питон
import dlt
dlt.create_streaming_table("kafka_target")
# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)
@dlt.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_target;
CREATE FLOW
topic1
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');
CREATE FLOW
topic2
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');
Дополнительные сведения о read_kafka()
табличной функции, используемой в запросах SQL, см.: read_kafka в справочнике по языку SQL.
В Python можно программно создать несколько потоков, предназначенных для одной таблицы. В следующем примере показан этот шаблон для списка разделов Kafka.
Заметка
Этот шаблон имеет те же требования, что и при использовании цикла for
для создания таблиц. Необходимо явно передать значение Python в функцию, определяющую поток. См. раздел Создание таблиц в цикле for
.
import dlt
dlt.create_streaming_table("kafka_target")
topic_list = ["topic1", "topic2", "topic3"]
for topic_name in topic_list:
@dlt.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
def topic_flow(topic=topic_name):
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", topic)
.load()
)
пример. Выполнение однократного резервного заполнения данных
В следующих примерах выполняется запрос для добавления исторических данных в таблицу потоковой передачи:
Заметка
Чтобы гарантировать действительно одноразовое заполнение, когда запрос на обратное заполнение является частью конвейера, работающего по расписанию или непрерывно, после однократного запуска конвейера удалите этот запрос. Чтобы добавлять новые данные, если они поступают в каталог для обратного заполнения, оставьте запрос в исходном состоянии.
Питон
import dlt
@dlt.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")
@dlt.append_flow(target = "csv_target")
def backfill():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")
SQL
CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
read_files(
"path/to/sourceDir",
"csv"
);
CREATE FLOW
backfill
AS INSERT INTO
csv_target BY NAME
SELECT * FROM
read_files(
"path/to/backfill/data/dir",
"csv"
);
пример : Используйте обработку потока добавления вместо UNION
Вместо использования запроса с предложением UNION
можно использовать запросы потока добавления для объединения нескольких источников и записи в одну потоковую таблицу. Использование запросов на добавление данных вместо UNION
позволяет добавлять данные в потоковую таблицу из нескольких источников без выполнения полного обновления.
В следующем примере Python содержится запрос, который объединяет несколько источников данных с предложением UNION
:
@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
raw_orders_eu =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
return raw_orders_us.union(raw_orders_eu)
Следующие примеры заменяют запрос UNION
на запросы последовательного добавления:
Питон
dlt.create_streaming_table("raw_orders")
@dlt.append_flow(target="raw_orders")
def raw_oders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")
SQL
CREATE OR REFRESH STREAMING TABLE raw_orders;
CREATE FLOW
raw_orders_us
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
read_files(
"/path/to/orders/us",
"csv"
);
CREATE FLOW
raw_orders_eu
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
read_files(
"/path/to/orders/eu",
"csv"
);
-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
raw_orders_apac
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
read_files(
"/path/to/orders/apac",
"csv"
);