Поделиться через


Загрузка данных с помощью таблиц потоковой передачи в Databricks SQL

Databricks рекомендует использовать потоковые таблицы для приема данных с помощью Databricks SQL. Потоковая таблица — это таблица , зарегистрированная в каталоге Unity с дополнительной поддержкой потоковой или добавочной обработки данных. Конвейер разностных динамических таблиц автоматически создается для каждой потоковой таблицы. Таблицы потоковой передачи можно использовать для добавочной загрузки данных из Kafka и облачного хранилища объектов.

В этой статье показано использование потоковых таблиц для загрузки данных из облачного хранилища объектов, настроенного в качестве тома каталога Unity (рекомендуется) или внешнего расположения.

Примечание.

Чтобы узнать, как использовать таблицы Delta Lake в качестве источников потоковой передачи и приемников, см. раздел потоковой передачи и записи в разностной таблице.

Внимание

Потоковые таблицы, созданные в Databricks SQL, поддерживаются бессерверным конвейером разностных динамических таблиц. Рабочая область должна поддерживать бессерверные конвейеры для использования этой функции.

Перед началом работы

Перед началом работы необходимо выполнить следующие требования.

Требования к рабочей области:

  • Учетная запись Azure Databricks с включенным бессерверным доступом. Дополнительные сведения см. в разделе "Включение бессерверных хранилищ SQL".
  • Рабочая область с включенным каталогом Unity. Дополнительные сведения см. в разделе "Настройка каталога Unity" и управление ими.

Требования к вычислениям:

Необходимо использовать одно из следующих элементов:

  • Хранилище SQL, использующее Current канал.

  • Вычисление с общим режимом доступа в Databricks Runtime 13.3 LTS или более поздней версии.

  • Вычисление с одним режимом доступа пользователей в Databricks Runtime 15.4 LTS или более поздней версии.

    В Databricks Runtime 15.3 и ниже вы не можете использовать однопользовательские вычисления для запроса таблиц потоковой передачи, принадлежащих другим пользователям. Вы можете использовать вычисление одного пользователя в Databricks Runtime 15.3 и ниже, только если вы владеете потоковой таблицей. Создатель таблицы является владельцем.

    Databricks Runtime 15.4 LTS и более поздних версий поддерживают запросы к таблицам, созданным Delta Live Table для вычислений одного пользователя, независимо от владения таблицами. Чтобы воспользоваться преимуществами фильтрации данных, предоставляемых в Databricks Runtime 15.4 LTS и более поздних версиях, необходимо убедиться, что рабочая область включена для бессерверных вычислений , так как функции фильтрации данных, поддерживающие таблицы, созданные Разностными динамическими таблицами, выполняются на бессерверных вычислениях. Вы можете взимать плату за бессерверные вычислительные ресурсы при использовании отдельных пользователей вычислений для выполнения операций фильтрации данных. Подробные инструкции по управлению доступом для отдельных пользователей.

Требования к разрешениям:

Другие требования:

  • Путь к исходным данным.

    Пример пути тома: /Volumes/<catalog>/<schema>/<volume>/<path>/<file-name>

    Пример пути к внешнему расположению: abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis

    Примечание.

    В этой статье предполагается, что данные, к которым требуется загрузить, хранятся в облачном хранилище, соответствующем тому каталога Unity или внешнему расположению, к которому у вас есть доступ.

Обнаружение и предварительный просмотр исходных данных

  1. На боковой панели рабочей области щелкните "Запросы" и нажмите кнопку "Создать запрос".

  2. В редакторе запросов выберите хранилище SQL, использующее Current канал из раскрывающегося списка.

  3. Вставьте приведенные ниже значения в редактор, заменив значения в угловых скобках (<>) для сведений, определяющих исходные данные, и нажмите кнопку "Выполнить".

    Примечание.

    При запуске read_files табличной функции могут возникнуть ошибки вывода схемы, если значения по умолчанию для функции не удается проанализировать данные. Например, может потребоваться настроить многострочный режим для файлов CSV или JSON. Список параметров синтаксического анализа см. в разделе read_files табличное значение функции.

    /* Discover your data in a volume */
    LIST "/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>"
    
    /* Preview your data in a volume */
    SELECT * FROM read_files("/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>") LIMIT 10
    
    /* Discover your data in an external location */
    LIST "abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>"
    
    /* Preview your data */
    SELECT * FROM read_files("abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>") LIMIT 10
    

Загрузка данных в потоковую таблицу

Чтобы создать потоковую таблицу из данных в облачном хранилище объектов, вставьте следующую команду в редактор запросов и нажмите кнопку "Выполнить".

/* Load data from a volume */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>')

/* Load data from an external location */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>')

Настройка канала среды выполнения

Потоковая передача таблиц, созданных с помощью хранилищ SQL, автоматически обновляется с помощью конвейера Delta Live Tables. Конвейеры delta Live Tables используют среду выполнения в канале current по умолчанию. Ознакомьтесь с заметками о выпуске Delta Live Tables и процессом обновления выпуска, чтобы узнать о процессе выпуска.

Databricks рекомендует использовать current канал для рабочих нагрузок. Новые функции сначала выпускаются в preview канале. Конвейер можно задать для канала delta Live Table для предварительной версии, чтобы протестировать новые функции, указав preview в качестве свойства таблицы. Это свойство можно указать при создании таблицы или после создания таблицы с помощью инструкции ALTER.

В следующем примере кода показано, как настроить канал для предварительного просмотра в инструкции CREATE:

CREATE OR REPLACE MATERIALIZED VIEW foo.default.bar
TBLPROPERTIES ('pipelines.channel' = 'preview') as
SELECT
  *
FROM
  range(5)

обновите потоковую таблицу с помощью конвейера DLT

В этом разделе описываются шаблоны обновления таблицы потоковой передачи с последними данными, доступными из источников, определенных в запросе.

При обновлении потоковой таблицы CREATE или REFRESH обработка выполняется с использованием конвейера Delta Live Tables с бессерверной архитектурой. Каждая потоковая таблица, которую вы определяете, имеет ассоциированный конвейер Delta Live Tables.

После выполнения команды REFRESH возвращается ссылка конвейера DLT. Чтобы проверить состояние обновления, можно использовать ссылку конвейера DLT.

Примечание.

Только владелец таблицы может обновить потоковую таблицу, чтобы получить последние данные. Пользователь, создающий таблицу, является владельцем, и владелец не может быть изменен. Вам может потребоваться обновить таблицу потоковой передачи перед использованием запросов перемещения по времени.

См. Что такое Delta Live Tables?.

Принимайте только новые данные

По умолчанию функция read_files считывает все существующие данные в исходном каталоге во время создания таблицы, а затем обрабатывает вновь поступающие записи с каждым обновлением.

Чтобы избежать приема данных, которые уже существуют в исходном каталоге во время создания таблицы, задайте параметр includeExistingFiles для false. Это означает, что только данные, поступающие в каталог после создания таблицы, обрабатываются. Например:

CREATE OR REFRESH STREAMING TABLE my_bronze_table
AS SELECT *
FROM STREAM read_files(
  'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
  includeExistingFiles => false)

Полное обновление таблицы потоковой передачи

Полные обновления повторно обрабатывают все данные, доступные в источнике с помощью последнего определения. Не рекомендуется вызывать полные обновления в источниках, которые не хранят всю историю данных или имеют короткие периоды хранения, например Kafka, так как полное обновление усечено существующих данных. Возможно, вы не сможете восстановить старые данные, если данные больше не доступны в источнике.

Например:

REFRESH STREAMING TABLE my_bronze_table FULL

Планирование потоковой таблицы для автоматического обновления

Чтобы настроить потоковую таблицу для автоматического обновления на основе определенного расписания, вставьте следующее в редактор запросов и нажмите кнопку "Выполнить".

ALTER STREAMING TABLE
[[<catalog>.]<database>.]<name>
ADD [SCHEDULE [REFRESH]
        CRON '<cron-string>'
                [ AT TIME ZONE '<timezone-id>' ]];

Например, запросы расписания обновления см. в ALTER STREAMING TABLE.

Отслеживание состояния обновления

Вы можете просмотреть состояние обновления потоковой таблицы, просмотрев конвейер, который управляет потоковой таблицей в пользовательском интерфейсе разностных динамических таблиц или просмотром сведений об обновлении, возвращаемых DESCRIBE EXTENDED командой для потоковой таблицы.

DESCRIBE EXTENDED <table-name>

Потоковая прием из Kafka

Пример приема потоковой передачи из Kafka см. в read_kafka.

Предоставление пользователям доступа к таблице потоковой передачи

Чтобы предоставить пользователям привилегии SELECT в таблице потоковой передачи, чтобы они могли запросить ее, вставьте следующее в редактор запросов и нажмите кнопку "Выполнить".

GRANT SELECT ON TABLE <catalog>.<schema>.<table> TO <user-or-group>

Дополнительные сведения о предоставлении привилегий для защищаемых объектов каталога Unity см. в разделе "Права каталога Unity" и защищаемые объекты.

Мониторинг запусков с помощью журнала запросов

Вы можете использовать страницу журнала запросов для доступа к сведениям о запросах и профилям запросов, которые помогут определить плохое выполнение запросов и узких мест в конвейере Delta Live Table, используемом для запуска обновлений потоковой таблицы. Общие сведения о типе информации, доступной в журналах запросов и профилях запросов, см. в разделе "Журнал запросов" и "Профиль запросов".

Внимание

Эта функция предоставляется в режиме общедоступной предварительной версии. Администраторы рабочей области могут включить эту функцию на странице "Предварительные версии". См. статью "Управление предварительными версиями Azure Databricks".

Все инструкции, связанные с таблицами потоковой передачи, отображаются в журнале запросов. Раскрывающийся список инструкций можно использовать для выбора любой команды и проверки связанных запросов. За всеми CREATE операторами следует REFRESH инструкция, которая выполняется асинхронно в конвейере Delta Live Table. Инструкции REFRESH обычно включают подробные планы запросов, которые предоставляют аналитические сведения о оптимизации производительности.

Чтобы получить доступ к REFRESH операторам в пользовательском интерфейсе журнала запросов, выполните следующие действия.

  1. Щелкните Значок журнала в левой боковой панели, чтобы открыть пользовательский интерфейс журнала запросов.
  2. Выберите флажок REFRESH из фильтра раскрывающегося списка Заявления .
  3. Щелкните имя инструкции запроса, чтобы просмотреть сводные сведения, такие как длительность запроса и агрегированные метрики.
  4. Щелкните "Просмотреть профиль запроса", чтобы открыть профиль запроса. Дополнительные сведения о навигации по профилю запроса см . в профиле запросов.
  5. При необходимости можно использовать ссылки в разделе "Источник запросов", чтобы открыть связанный запрос или конвейер.

Вы также можете получить доступ к сведениям о запросах с помощью ссылок в редакторе SQL или из записной книжки, подключенной к хранилищу SQL.

Примечание.

Потоковая таблица должна быть настроена для запуска с помощью канала предварительной версии . См. раздел "Задать канал среды выполнения".

Дополнительные ресурсы