Использование разностных таблиц с потоковыми данными

Завершено

Все данные, которые мы изучили до этой точки, были статическими данными в файлах. Однако многие сценарии аналитики данных включают потоковую передачу данных, которые нужно обрабатывать практически в режиме реального времени. Например, может потребоваться записывать показания устройств Интернета вещей и сохранять их в таблице по мере возникновения. Spark обрабатывает пакетные данные и потоковую передачу данных таким же образом, что позволяет обрабатывать потоковые данные в режиме реального времени с помощью того же API.

Структурированная потоковая передача Spark

Обычное решение потоковой обработки включает в себя следующее:

  • Постоянно считывает поток данных из источника.
  • При необходимости обработка данных для выбора определенных полей, статистических и групповых значений или управления данными.
  • Запись результатов в приемник.

Spark включает встроенную поддержку потоковой передачи данных посредством API Spark Structured Streaming, основанного на безграничном кадре данных, в котором данные потоковой передачи записываются для обработки. Кадр данных структурированной потоковой передачи Spark может считывать данные из различных типов источников потоковой передачи, включая:

  • Сетевые порты
  • Службы брокера сообщений в режиме реального времени, такие как Центры событий Azure или Kafka
  • Расположения файловой системы.

Совет

Дополнительные сведения см. в руководстве по программированию Spark Structured Streaming в документации Spark.

Потоковая передача с разностными таблицами

Вы можете использовать разностную таблицу в качестве источника или приемника для структурированной потоковой передачи Spark. Например, можно записать поток данных в режиме реального времени с устройства Интернета вещей и записать поток непосредственно в таблицу Delta в качестве приемника. Затем вы можете запросить таблицу, чтобы просмотреть последние потоковые данные. Кроме того, вы можете прочитать Delta в качестве источника потоковой передачи, что позволяет практически в реальном времени получать отчеты по мере добавления новых данных в таблицу.

Использование разностной таблицы в качестве источника потоковой передачи

В следующем примере PySpark создается таблица Delta для хранения сведений о заказах в Интернете:

%%sql
CREATE TABLE orders_in
(
        OrderID INT,
        OrderDate DATE,
        Customer STRING,
        Product STRING,
        Quantity INT,
        Price DECIMAL
)
USING DELTA;

В таблицу orders_in вставляется гипотетический поток данных интернет-заказов:

%%sql
INSERT INTO orders_in (OrderID, OrderDate, Customer, Product, Quantity, Price)
VALUES
    (3001, '2024-09-01', 'Yang', 'Road Bike Red', 1, 1200),
    (3002, '2024-09-01', 'Carlson', 'Mountain Bike Silver', 1, 1500),
    (3003, '2024-09-02', 'Wilson', 'Road Bike Yellow', 2, 1350),
    (3004, '2024-09-02', 'Yang', 'Road Front Wheel', 1, 115),
    (3005, '2024-09-02', 'Rai', 'Mountain Bike Black', 1, NULL);

Чтобы проверить, можно считывать и отображать данные из входной таблицы:

# Read and display the input table
df = spark.read.format("delta").table("orders_in")

display(df)

Затем данные загружаются в потоковый кадр данных из таблицы Delta:

# Load a streaming DataFrame from the Delta table
stream_df = spark.readStream.format("delta") \
    .option("ignoreChanges", "true") \
    .table("orders_in")

Примечание.

При использовании разностной таблицы в качестве источника потоковой передачи в поток можно включить только операции добавления . Изменения данных могут привести к ошибке, если не указать или ignoreDeletes не указать ignoreChanges параметр.

Вы можете проверить потоковую передачу потока с помощью isStreaming свойства, которое должно возвращать значение True:

# Verify that the stream is streaming
stream_df.isStreaming

Преобразование потока данных

После чтения данных из таблицы Delta в потоковый кадр данных можно использовать API структурированной потоковой передачи Spark для обработки. Например, можно подсчитать количество заказов, размещенных каждую минуту, и отправить агрегированные результаты в подчиненный процесс для визуализации почти в режиме реального времени.

В этом примере все строки со значением NULL в столбце Price фильтруются, а новые столбцы добавляются для IsBike и Total.

from pyspark.sql.functions import col, expr

transformed_df = stream_df.filter(col("Price").isNotNull()) \
    .withColumn('IsBike', expr("INSTR(Product, 'Bike') > 0").cast('int')) \
    .withColumn('Total', expr("Quantity * Price").cast('decimal'))

Использование разностной таблицы в качестве приемника потоковой передачи

Затем поток данных записывается в таблицу Delta:

# Write the stream to a delta table
output_table_path = 'Tables/orders_processed'
checkpointpath = 'Files/delta/checkpoint'
deltastream = transformed_df.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(output_table_path)

print("Streaming to orders_processed...")

Примечание.

Параметр checkpointLocation используется для записи файла контрольных точек, который отслеживает состояние обработки потока. Этот файл позволяет восстановиться после сбоя в точке, в которой потоковая обработка прекратилась.

После запуска потокового процесса можно запросить таблицу Delta Lake, чтобы увидеть, что находится в выходной таблице. Может потребоваться небольшая задержка, прежде чем запрашивать таблицу.

%%sql
SELECT *
    FROM orders_processed
    ORDER BY OrderID;

В результатах этого запроса заказ 3005 исключается из-за того, что он имел значение NULL в столбце Price. При этом отображаются два столбца, которые были добавлены во время преобразования — IsBike и Total.

OrderID Датазаказа Клиент Продукт Количество Цена IsBike Итог
3001 2023-09-01 Ян Дорожный велосипед Красный 1 1200 1 1200
3002 2023-09-01 Карлсон Горный велосипед Silver 1 1500 1 1500
3003 2023-09-02 Wilson Дорожный велосипед желтый 2 1350 1 2700
3004 2023-09-02 Ян Дорожное колесо переднего плана 1 115 0 115

По завершении остановите потоковую передачу данных, чтобы избежать ненужных затрат на обработку с помощью stop метода:

# Stop the streaming data to avoid excessive processing costs
deltastream.stop()

Совет

Дополнительные сведения об использовании разностных таблиц для потоковой передачи данных см. в документации Delta Lake.