Запрос потоковых данных
С помощью Azure Databricks можно запрашивать источники данных потоковой передачи с помощью структурированной потоковой передачи. Azure Databricks обеспечивает обширную поддержку потоковых рабочих нагрузок в Python и Scala и поддерживает большинство функций структурированной потоковой передачи с помощью SQL.
В следующих примерах показано использование приемника памяти для ручной проверки потоковых данных во время интерактивной разработки в записных книжках. Из-за ограничений вывода строк в пользовательском интерфейсе записной книжки могут не наблюдаться все данные, считываемые потоковыми запросами. В рабочих нагрузках следует запускать только потоковые запросы, записывая их в целевую table или внешнюю систему.
Примечание.
Поддержка SQL для интерактивных запросов на потоковую передачу данных ограничена записными книжками, работающими на всех вычислительных ресурсах. Вы также можете использовать SQL при объявлении потоковой передачи tables в Databricks SQL или Delta Live Tables. См. загрузка данных с использованием потоковой передачи tables в Databricks SQL и Что представляет собой Delta Live Tables?.
Запрос данных из систем потоковой передачи
Azure Databricks предоставляет средства чтения потоковых данных для следующих систем потоковой передачи:
- Kafka
- Kinesis
- PubSub
- Пульсар
При инициализации запросов к этим системам необходимо указать сведения о конфигурации, которые зависят от настроенной среды и системы, из которой вы выбираете чтение. См. раздел "Настройка источников данных потоковой передачи".
Распространенные рабочие нагрузки, связанные с потоковой передачей, включают прием данных в lakehouse и потоковую обработку для приемника данных во внешних системах. Дополнительные сведения о рабочих нагрузках потоковой передачи см. в статье "Потоковая передача" в Azure Databricks.
В следующих примерах демонстрируется интерактивное потоковое чтение из Kafka:
Python
display(spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)
SQL
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:ip>',
subscribe => '<topic>',
startingOffsets => 'latest'
);
Запросить table для потокового чтения
Azure Databricks создает все tables с помощью Delta Lake по умолчанию. При выполнении потокового запроса к Delta tableзапрос автоматически выбирает новые записи при фиксации версии table. По умолчанию потоковые запросы ожидают, что источник tables содержит только добавленные записи. Если вам нужно работать с потоковыми данными, содержащими обновления и удаления, Databricks рекомендует использовать delta Live Tables и APPLY CHANGES INTO
. См. API "APPLY CHANGES": упрощение отслеживания изменений данных с помощью Delta Live Tables.
В следующих примерах демонстрируется выполнение интерактивной потоковой передачи из table:
Python
display(spark.readStream.table("table_name"))
SQL
SELECT * FROM STREAM table_name
Запрос данных в облачном хранилище объектов с помощью автозагрузчика
Вы можете передавать данные из облачного хранилища объектов с помощью автозагрузчика, соединителя облачных данных Azure Databricks. Соединитель можно использовать с файлами, хранящимися в Unity Catalogvolumes или в других расположениях облачного хранилища объектов. Databricks рекомендует использовать volumes для управления доступом к данным в облачном хранилище объектов. См. статью "Подключение к источникам данных".
Azure Databricks оптимизирует этот соединитель для приема потоковых данных в облачном хранилище объектов, которое хранится в популярных структурированных, полуструктурированных и неструктурированных форматах. Databricks рекомендует хранить поступающие данные почти в исходном формате для максимальной пропускной способности и уменьшения потенциальной потери данных из-за поврежденных записей или изменений schema.
Дополнительные рекомендации по приему данных из облачного хранилища объектов см . в разделе "Прием данных" в databricks lakehouse.
В следующих примерах показано интерактивное потоковое чтение из каталога JSON-файлов в томе:
Python
display(spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").load("/Volumes/catalog/schema/volumes/path/to/files"))
SQL
SELECT * FROM STREAM read_files('/Volumes/catalog/schema/volumes/path/to/files', format => 'json')