Использование разностных таблиц с потоковыми данными
Все данные, которые мы изучили до этой точки, были статическими данными в файлах. Однако многие сценарии аналитики данных включают потоковую передачу данных, которые нужно обрабатывать практически в режиме реального времени. Например, может потребоваться записывать показания устройств Интернета вещей и сохранять их в таблице по мере возникновения. Spark обрабатывает пакетные данные и потоковую передачу данных таким же образом, что позволяет обрабатывать потоковые данные в режиме реального времени с помощью того же API.
Структурированная потоковая передача Spark
Обычное решение потоковой обработки включает в себя следующее:
- Постоянно считывает поток данных из источника.
- При необходимости обработка данных для выбора определенных полей, статистических и групповых значений или управления данными.
- Запись результатов в приемник.
Spark включает встроенную поддержку потоковой передачи данных посредством API Spark Structured Streaming, основанного на безграничном кадре данных, в котором данные потоковой передачи записываются для обработки. Кадр данных структурированной потоковой передачи Spark может считывать данные из различных типов источников потоковой передачи, включая:
- Сетевые порты
- Службы брокера сообщений в режиме реального времени, такие как Центры событий Azure или Kafka
- Расположения файловой системы.
Совет
Дополнительные сведения см. в руководстве по программированию Spark Structured Streaming в документации Spark.
Потоковая передача с разностными таблицами
Вы можете использовать разностную таблицу в качестве источника или приемника для структурированной потоковой передачи Spark. Например, можно записать поток данных в режиме реального времени с устройства Интернета вещей и записать поток непосредственно в таблицу Delta в качестве приемника. Затем вы можете запросить таблицу, чтобы просмотреть последние потоковые данные. Кроме того, вы можете прочитать Delta в качестве источника потоковой передачи, что позволяет практически в реальном времени получать отчеты по мере добавления новых данных в таблицу.
Использование разностной таблицы в качестве источника потоковой передачи
В следующем примере PySpark создается таблица Delta для хранения сведений о заказах в Интернете:
%%sql
CREATE TABLE orders_in
(
OrderID INT,
OrderDate DATE,
Customer STRING,
Product STRING,
Quantity INT,
Price DECIMAL
)
USING DELTA;
В таблицу orders_in вставляется гипотетический поток данных интернет-заказов:
%%sql
INSERT INTO orders_in (OrderID, OrderDate, Customer, Product, Quantity, Price)
VALUES
(3001, '2024-09-01', 'Yang', 'Road Bike Red', 1, 1200),
(3002, '2024-09-01', 'Carlson', 'Mountain Bike Silver', 1, 1500),
(3003, '2024-09-02', 'Wilson', 'Road Bike Yellow', 2, 1350),
(3004, '2024-09-02', 'Yang', 'Road Front Wheel', 1, 115),
(3005, '2024-09-02', 'Rai', 'Mountain Bike Black', 1, NULL);
Чтобы проверить, можно считывать и отображать данные из входной таблицы:
# Read and display the input table
df = spark.read.format("delta").table("orders_in")
display(df)
Затем данные загружаются в потоковый кадр данных из таблицы Delta:
# Load a streaming DataFrame from the Delta table
stream_df = spark.readStream.format("delta") \
.option("ignoreChanges", "true") \
.table("orders_in")
Примечание.
При использовании разностной таблицы в качестве источника потоковой передачи в поток можно включить только операции добавления . Изменения данных могут привести к ошибке, если не указать или ignoreDeletes
не указать ignoreChanges
параметр.
Вы можете проверить потоковую передачу потока с помощью isStreaming
свойства, которое должно возвращать значение True:
# Verify that the stream is streaming
stream_df.isStreaming
Преобразование потока данных
После чтения данных из таблицы Delta в потоковый кадр данных можно использовать API структурированной потоковой передачи Spark для обработки. Например, можно подсчитать количество заказов, размещенных каждую минуту, и отправить агрегированные результаты в подчиненный процесс для визуализации почти в режиме реального времени.
В этом примере все строки со значением NULL в столбце Price фильтруются, а новые столбцы добавляются для IsBike и Total.
from pyspark.sql.functions import col, expr
transformed_df = stream_df.filter(col("Price").isNotNull()) \
.withColumn('IsBike', expr("INSTR(Product, 'Bike') > 0").cast('int')) \
.withColumn('Total', expr("Quantity * Price").cast('decimal'))
Использование разностной таблицы в качестве приемника потоковой передачи
Затем поток данных записывается в таблицу Delta:
# Write the stream to a delta table
output_table_path = 'Tables/orders_processed'
checkpointpath = 'Files/delta/checkpoint'
deltastream = transformed_df.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(output_table_path)
print("Streaming to orders_processed...")
Примечание.
Параметр checkpointLocation
используется для записи файла контрольных точек, который отслеживает состояние обработки потока. Этот файл позволяет восстановиться после сбоя в точке, в которой потоковая обработка прекратилась.
После запуска потокового процесса можно запросить таблицу Delta Lake, чтобы увидеть, что находится в выходной таблице. Может потребоваться небольшая задержка, прежде чем запрашивать таблицу.
%%sql
SELECT *
FROM orders_processed
ORDER BY OrderID;
В результатах этого запроса заказ 3005 исключается из-за того, что он имел значение NULL в столбце Price. При этом отображаются два столбца, которые были добавлены во время преобразования — IsBike и Total.
OrderID | Датазаказа | Клиент | Продукт | Количество | Цена | IsBike | Итог |
---|---|---|---|---|---|---|---|
3001 | 2023-09-01 | Ян | Дорожный велосипед Красный | 1 | 1200 | 1 | 1200 |
3002 | 2023-09-01 | Карлсон | Горный велосипед Silver | 1 | 1500 | 1 | 1500 |
3003 | 2023-09-02 | Wilson | Дорожный велосипед желтый | 2 | 1350 | 1 | 2700 |
3004 | 2023-09-02 | Ян | Дорожное колесо переднего плана | 1 | 115 | 0 | 115 |
По завершении остановите потоковую передачу данных, чтобы избежать ненужных затрат на обработку с помощью stop
метода:
# Stop the streaming data to avoid excessive processing costs
deltastream.stop()
Совет
Дополнительные сведения об использовании разностных таблиц для потоковой передачи данных см. в документации Delta Lake.