Загрузка данных с помощью таблиц потоковой передачи в Databricks SQL
Databricks рекомендует использовать потоковые таблицы для приема данных с помощью Databricks SQL. Потоковая таблица — это таблица , зарегистрированная в каталоге Unity с дополнительной поддержкой потоковой или добавочной обработки данных. Конвейер разностных динамических таблиц автоматически создается для каждой потоковой таблицы. Таблицы потоковой передачи можно использовать для добавочной загрузки данных из Kafka и облачного хранилища объектов.
В этой статье показано использование потоковых таблиц для загрузки данных из облачного хранилища объектов, настроенного в качестве тома каталога Unity (рекомендуется) или внешнего расположения.
Примечание.
Чтобы узнать, как использовать таблицы Delta Lake в качестве источников потоковой передачи и приемников, см. раздел потоковой передачи и записи в разностной таблице.
Внимание
Потоковые таблицы, созданные в Databricks SQL, поддерживаются бессерверным конвейером разностных динамических таблиц. Рабочая область должна поддерживать бессерверные конвейеры для использования этой функции.
Перед началом работы
Перед началом работы необходимо выполнить следующие требования.
Требования к рабочей области:
- Учетная запись Azure Databricks с включенным бессерверным доступом. Дополнительные сведения см. в разделе "Включение бессерверных хранилищ SQL".
- Рабочая область с включенным каталогом Unity. Дополнительные сведения см. в разделе "Настройка каталога Unity" и управление ими.
Требования к вычислениям:
Необходимо использовать одно из следующих элементов:
Хранилище SQL, использующее
Current
канал.Вычисление с общим режимом доступа в Databricks Runtime 13.3 LTS или более поздней версии.
Вычисление с одним режимом доступа пользователей в Databricks Runtime 15.4 LTS или более поздней версии.
В Databricks Runtime 15.3 и ниже вы не можете использовать однопользовательские вычисления для запроса таблиц потоковой передачи, принадлежащих другим пользователям. Вы можете использовать вычисление одного пользователя в Databricks Runtime 15.3 и ниже, только если вы владеете потоковой таблицей. Создатель таблицы является владельцем.
Databricks Runtime 15.4 LTS и более поздних версий поддерживают запросы к таблицам, созданным Delta Live Table для вычислений одного пользователя, независимо от владения таблицами. Чтобы воспользоваться преимуществами фильтрации данных, предоставляемых в Databricks Runtime 15.4 LTS и более поздних версиях, необходимо убедиться, что рабочая область включена для бессерверных вычислений , так как функции фильтрации данных, поддерживающие таблицы, созданные Разностными динамическими таблицами, выполняются на бессерверных вычислениях. Вы можете взимать плату за бессерверные вычислительные ресурсы при использовании отдельных пользователей вычислений для выполнения операций фильтрации данных. Подробные инструкции по управлению доступом для отдельных пользователей.
Требования к разрешениям:
- Привилегия
READ FILES
во внешнем расположении каталога Unity. Дополнительные сведения см. в статье "Создание внешнего расположения для подключения облачного хранилища к Azure Databricks". - Привилегия
USE CATALOG
каталога, в котором создается потоковая таблица. - Привилегия
USE SCHEMA
схемы, в которой создается потоковая таблица. - Привилегия
CREATE TABLE
схемы, в которой создается потоковая таблица.
Другие требования:
Путь к исходным данным.
Пример пути тома:
/Volumes/<catalog>/<schema>/<volume>/<path>/<file-name>
Пример пути к внешнему расположению:
abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis
Примечание.
В этой статье предполагается, что данные, к которым требуется загрузить, хранятся в облачном хранилище, соответствующем тому каталога Unity или внешнему расположению, к которому у вас есть доступ.
Обнаружение и предварительный просмотр исходных данных
На боковой панели рабочей области щелкните "Запросы" и нажмите кнопку "Создать запрос".
В редакторе запросов выберите хранилище SQL, использующее
Current
канал из раскрывающегося списка.Вставьте приведенные ниже значения в редактор, заменив значения в угловых скобках (
<>
) для сведений, определяющих исходные данные, и нажмите кнопку "Выполнить".Примечание.
При запуске
read_files
табличной функции могут возникнуть ошибки вывода схемы, если значения по умолчанию для функции не удается проанализировать данные. Например, может потребоваться настроить многострочный режим для файлов CSV или JSON. Список параметров синтаксического анализа см. в разделе read_files табличное значение функции./* Discover your data in a volume */ LIST "/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>" /* Preview your data in a volume */ SELECT * FROM read_files("/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>") LIMIT 10 /* Discover your data in an external location */ LIST "abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>" /* Preview your data */ SELECT * FROM read_files("abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>") LIMIT 10
Загрузка данных в потоковую таблицу
Чтобы создать потоковую таблицу из данных в облачном хранилище объектов, вставьте следующую команду в редактор запросов и нажмите кнопку "Выполнить".
/* Load data from a volume */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>')
/* Load data from an external location */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>')
Настройка канала среды выполнения
Потоковая передача таблиц, созданных с помощью хранилищ SQL, автоматически обновляется с помощью конвейера Delta Live Tables. Конвейеры delta Live Tables используют среду выполнения в канале current
по умолчанию. Ознакомьтесь с заметками о выпуске Delta Live Tables и процессом обновления выпуска, чтобы узнать о процессе выпуска.
Databricks рекомендует использовать current
канал для рабочих нагрузок. Новые функции сначала выпускаются в preview
канале. Конвейер можно задать для канала delta Live Table для предварительной версии, чтобы протестировать новые функции, указав preview
в качестве свойства таблицы. Это свойство можно указать при создании таблицы или после создания таблицы с помощью инструкции ALTER.
В следующем примере кода показано, как настроить канал для предварительного просмотра в инструкции CREATE:
CREATE OR REPLACE MATERIALIZED VIEW foo.default.bar
TBLPROPERTIES ('pipelines.channel' = 'preview') as
SELECT
*
FROM
range(5)
обновите потоковую таблицу с помощью конвейера DLT
В этом разделе описываются шаблоны обновления таблицы потоковой передачи с последними данными, доступными из источников, определенных в запросе.
При обновлении потоковой таблицы CREATE
или REFRESH
обработка выполняется с использованием конвейера Delta Live Tables с бессерверной архитектурой. Каждая потоковая таблица, которую вы определяете, имеет ассоциированный конвейер Delta Live Tables.
После выполнения команды REFRESH
возвращается ссылка конвейера DLT. Чтобы проверить состояние обновления, можно использовать ссылку конвейера DLT.
Примечание.
Только владелец таблицы может обновить потоковую таблицу, чтобы получить последние данные. Пользователь, создающий таблицу, является владельцем, и владелец не может быть изменен. Вам может потребоваться обновить таблицу потоковой передачи перед использованием запросов перемещения по времени.
См. Что такое Delta Live Tables?.
Принимайте только новые данные
По умолчанию функция read_files
считывает все существующие данные в исходном каталоге во время создания таблицы, а затем обрабатывает вновь поступающие записи с каждым обновлением.
Чтобы избежать приема данных, которые уже существуют в исходном каталоге во время создания таблицы, задайте параметр includeExistingFiles
для false
. Это означает, что только данные, поступающие в каталог после создания таблицы, обрабатываются. Например:
CREATE OR REFRESH STREAMING TABLE my_bronze_table
AS SELECT *
FROM STREAM read_files(
'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
includeExistingFiles => false)
Полное обновление таблицы потоковой передачи
Полные обновления повторно обрабатывают все данные, доступные в источнике с помощью последнего определения. Не рекомендуется вызывать полные обновления в источниках, которые не хранят всю историю данных или имеют короткие периоды хранения, например Kafka, так как полное обновление усечено существующих данных. Возможно, вы не сможете восстановить старые данные, если данные больше не доступны в источнике.
Например:
REFRESH STREAMING TABLE my_bronze_table FULL
Планирование потоковой таблицы для автоматического обновления
Чтобы настроить потоковую таблицу для автоматического обновления на основе определенного расписания, вставьте следующее в редактор запросов и нажмите кнопку "Выполнить".
ALTER STREAMING TABLE
[[<catalog>.]<database>.]<name>
ADD [SCHEDULE [REFRESH]
CRON '<cron-string>'
[ AT TIME ZONE '<timezone-id>' ]];
Например, запросы расписания обновления см. в ALTER STREAMING TABLE.
Отслеживание состояния обновления
Вы можете просмотреть состояние обновления потоковой таблицы, просмотрев конвейер, который управляет потоковой таблицей в пользовательском интерфейсе разностных динамических таблиц или просмотром сведений об обновлении, возвращаемых DESCRIBE EXTENDED
командой для потоковой таблицы.
DESCRIBE EXTENDED <table-name>
Потоковая прием из Kafka
Пример приема потоковой передачи из Kafka см. в read_kafka.
Предоставление пользователям доступа к таблице потоковой передачи
Чтобы предоставить пользователям привилегии SELECT
в таблице потоковой передачи, чтобы они могли запросить ее, вставьте следующее в редактор запросов и нажмите кнопку "Выполнить".
GRANT SELECT ON TABLE <catalog>.<schema>.<table> TO <user-or-group>
Дополнительные сведения о предоставлении привилегий для защищаемых объектов каталога Unity см. в разделе "Права каталога Unity" и защищаемые объекты.
Мониторинг запусков с помощью журнала запросов
Вы можете использовать страницу журнала запросов для доступа к сведениям о запросах и профилям запросов, которые помогут определить плохое выполнение запросов и узких мест в конвейере Delta Live Table, используемом для запуска обновлений потоковой таблицы. Общие сведения о типе информации, доступной в журналах запросов и профилях запросов, см. в разделе "Журнал запросов" и "Профиль запросов".
Внимание
Эта функция предоставляется в режиме общедоступной предварительной версии. Администраторы рабочей области могут включить эту функцию на странице "Предварительные версии". См. статью "Управление предварительными версиями Azure Databricks".
Все инструкции, связанные с таблицами потоковой передачи, отображаются в журнале запросов. Раскрывающийся список инструкций можно использовать для выбора любой команды и проверки связанных запросов. За всеми CREATE
операторами следует REFRESH
инструкция, которая выполняется асинхронно в конвейере Delta Live Table. Инструкции REFRESH
обычно включают подробные планы запросов, которые предоставляют аналитические сведения о оптимизации производительности.
Чтобы получить доступ к REFRESH
операторам в пользовательском интерфейсе журнала запросов, выполните следующие действия.
- Щелкните в левой боковой панели, чтобы открыть пользовательский интерфейс журнала запросов.
- Выберите флажок REFRESH из фильтра раскрывающегося списка Заявления .
- Щелкните имя инструкции запроса, чтобы просмотреть сводные сведения, такие как длительность запроса и агрегированные метрики.
- Щелкните "Просмотреть профиль запроса", чтобы открыть профиль запроса. Дополнительные сведения о навигации по профилю запроса см . в профиле запросов.
- При необходимости можно использовать ссылки в разделе "Источник запросов", чтобы открыть связанный запрос или конвейер.
Вы также можете получить доступ к сведениям о запросах с помощью ссылок в редакторе SQL или из записной книжки, подключенной к хранилищу SQL.
Примечание.
Потоковая таблица должна быть настроена для запуска с помощью канала предварительной версии . См. раздел "Задать канал среды выполнения".