Поделиться через


Использование онлайн-таблиц для обслуживания функций в режиме реального времени

Внимание

Онлайн-таблицы находятся в общедоступной предварительной версии в следующих регионах: westus, eastus, eastus2, northeurope. westeurope Сведения о ценах см. в разделе "Цены на онлайн-таблицы".

Онлайн-таблица представляет собой копию разностной таблицы, которая хранится в формате, ориентированном на строки, оптимизированном для доступа в Интернете. Сетевые таблицы — это полностью бессерверные таблицы, которые обеспечивают автоматическую масштабирование пропускной способности с нагрузкой запроса и обеспечивают низкую задержку и высокий доступ к данным любого масштаба. Онлайн-таблицы предназначены для работы с приложениями для быстрого поиска данных с помощью мозаичных моделей ИИ, обслуживания функций и получения дополненного поколения (RAG).

Вы также можете использовать онлайн-таблицы в запросах с помощью Федерации Lakehouse. При использовании Федерации Lakehouse необходимо использовать бессерверное хранилище SQL для доступа к онлайн-таблицам. Поддерживаются только операции чтения (SELECT). Эта возможность предназначена только для интерактивных или отладок и не должна использоваться для рабочих или критически важных рабочих нагрузок.

Создание интерактивной таблицы с помощью пользовательского интерфейса Databricks — это одношаговый процесс. Просто выберите таблицу Delta в обозревателе каталогов и выберите " Создать онлайн-таблицу". Для создания и управления онлайн-таблицами можно также использовать REST API или пакет SDK Databricks. См. статью " Работа с онлайн-таблицами с помощью API".

Требования

  • Рабочая область должна быть включена для каталога Unity. Следуйте документации, чтобы создать хранилище метаданных каталога Unity, включите его в рабочей области и создайте каталог.
  • Модель должна быть зарегистрирована в каталоге Unity для доступа к онлайн-таблицам.

Работа с онлайн-таблицами с помощью пользовательского интерфейса

В этом разделе описывается создание и удаление интерактивных таблиц, а также проверка состояния и активация обновлений веб-таблиц.

Создание интерактивной таблицы с помощью пользовательского интерфейса

Вы создаете онлайн-таблицу с помощью обозревателя каталогов. Сведения о необходимых разрешениях см. в разделе "Разрешения пользователей".

  1. Чтобы создать интерактивную таблицу, исходная таблица Delta должна иметь первичный ключ. Если в таблице Delta, которую вы хотите использовать, нет первичного ключа, создайте ее, выполнив следующие инструкции: используйте существующую таблицу Delta в каталоге Unity в качестве таблицы компонентов.

  2. В обозревателе каталогов перейдите к исходной таблице, которую требуется синхронизировать с веб-таблицей. В меню "Создать" выберите "Онлайн- таблица".

    Выберите команду

  3. Используйте селекторы в диалоговом окне, чтобы настроить интерактивную таблицу.

    Диалоговое окно настройки интерактивной таблицы

    Имя: имя, используемое для интерактивной таблицы в каталоге Unity.

    Первичный ключ: столбцы в исходной таблице для использования в качестве первичных ключей в интерактивной таблице.

    Ключ таймерии: (необязательно). Столбец в исходной таблице для использования в качестве ключа таймерии. При указании онлайн-таблица включает только строку с последним значением ключа таймерии для каждого первичного ключа.

    Режим синхронизации. Указывает, как конвейер синхронизации обновляет интерактивную таблицу. Выберите один из моментальных снимков, триггеров или непрерывных.

    Политика Description
    Snapshot Конвейер выполняется один раз, чтобы создать моментальный снимок исходной таблицы и скопировать его в интерактивную таблицу. Последующие изменения исходной таблицы автоматически отражаются в интерактивной таблице, принимая новый снимок источника и создавая новую копию. Содержимое интерактивной таблицы обновляется атомарно.
    Активируемые Конвейер запускается один раз, чтобы создать начальную копию моментального снимка исходной таблицы в интерактивной таблице. В отличие от режима синхронизации моментальных снимков, когда таблица в сети обновляется, извлекаются и применяются только изменения после последнего выполнения конвейера. Добавочное обновление можно активировать вручную или автоматически активировать в соответствии с расписанием.
    Непрерывные Конвейер выполняется непрерывно. Последующие изменения исходной таблицы постепенно применяются к интерактивной таблице в режиме потоковой передачи в режиме потоковой передачи в режиме реального времени. Обновление вручную не требуется.

Примечание.

Для поддержки режима триггерной или непрерывной синхронизации исходная таблица должна включать канал изменений данных.

  1. По завершении нажмите кнопку "Подтвердить". Откроется страница таблицы в Интернете.
  2. Новая интерактивная таблица создается в каталоге, схеме и имени, указанном в диалоговом окне создания. В обозревателе каталогов таблица в сети указывается.Значок таблицы в Сети

Получение состояния и активации обновлений с помощью пользовательского интерфейса

Чтобы проверить состояние интерактивной таблицы, щелкните имя таблицы в каталоге, чтобы открыть ее. Откроется страница "Обзор" с открытой вкладкой "Обзор ". В разделе "Прием данных" отображается состояние последнего обновления. Чтобы активировать обновление, нажмите кнопку "Синхронизировать сейчас". В разделе "Прием данных" также содержится ссылка на конвейер Разностных динамических таблиц, который обновляет таблицу.

просмотр страницы таблицы в интернете в каталоге

Планирование периодических обновлений

Для онлайн-таблиц с режимом синхронизации моментальных снимков или триггеров можно запланировать автоматические периодические обновления. Расписание обновления управляется конвейером Разностных динамических таблиц, который обновляет таблицу.

  1. В обозревателе каталогов перейдите к интерактивной таблице.
  2. В разделе " Прием данных" щелкните ссылку на конвейер.
  3. В правом верхнем углу щелкните "Расписание" и добавьте новое расписание или обновите существующие расписания.

Удаление интерактивной таблицы с помощью пользовательского интерфейса

На странице таблицы в Интернете выберите "Удалить " в Меню Кебаб меню кебаб.

Работа с онлайн-таблицами с помощью API

Вы также можете использовать пакет SDK Databricks или REST API для создания и управления онлайн-таблицами.

Справочные сведения см. в справочной документации по пакету SDK Databricks для Python или REST API.

Требования

Пакет SDK databricks версии 0.20 или более поздней.

Создание онлайн-таблицы с помощью API

Пакет SDK Databricks — Python

from pprint import pprint
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import *

w = WorkspaceClient(host='https://xxx.databricks.com', token='xxx')

# Create an online table
spec = OnlineTableSpec(
  primary_key_columns=["pk_col"],
  source_table_full_name="main.default.source_table",
  run_triggered=OnlineTableSpecTriggeredSchedulingPolicy.from_dict({'triggered': 'true'})
)

online_table = OnlineTable(
  name="main.default.my_online_table",  # Fully qualified table name
  spec=spec  # Online table specification
)

w.online_tables.create_and_wait(table=online_table)

REST API

curl --request POST "https://xxx.databricks.com/api/2.0/online-tables" \
--header "Authorization: Bearer xxx" \
--data '{
    "name": "main.default.my_online_table",
    "spec": {
        "run_triggered": {},
        "source_table_full_name": "main.default.source_table",
        "primary_key_columns": ["a"]
    }
  }'

После создания таблицы в Сети автоматически начинается синхронизация.

Получение состояния и обновление триггера с помощью API

Вы можете просмотреть состояние и спецификацию онлайн-таблицы, следуя приведенному ниже примеру. Если ваша онлайн-таблица не является непрерывной и вы хотите активировать обновление данных вручную, вы можете использовать API конвейера для этого.

Используйте идентификатор конвейера, связанный с интерактивной таблицей в спецификации веб-таблицы, и запустите новое обновление конвейера для активации обновления. Это эквивалентно нажатию кнопки "Синхронизация сейчас " в пользовательском интерфейсе таблицы в обозревателе каталогов.

Пакет SDK Databricks — Python

pprint(w.online_tables.get('main.default.my_online_table'))

# Sample response
OnlineTable(name='main.default.my_online_table',
    spec=OnlineTableSpec(perform_full_copy=None,
        pipeline_id='some-pipeline-id',
        primary_key_columns=['pk_col'],
        run_continuously=None,
        run_triggered={},
        source_table_full_name='main.default.source_table',
        timeseries_key=None),
    status=OnlineTableStatus(continuous_update_status=None,
        detailed_state=OnlineTableState.PROVISIONING,
        failed_status=None,
        message='Online Table creation is '
            'pending. Check latest status in '
            'Delta Live Tables: '
            'https://xxx.databricks.com/pipelines/some-pipeline-id',
        provisioning_status=None,
        triggered_update_status=None))

# Trigger an online table refresh by calling the pipeline API. To discard all existing data
# in the online table before refreshing, set "full_refresh" to "True". This is useful if your
# online table sync is stuck due to, for example, the source table being deleted and recreated
# with the same name while the sync was running.
w.pipelines.start_update(pipeline_id='some-pipeline-id', full_refresh=True)

REST API

curl --request GET \
  "https://xxx.databricks.com/api/2.0/online-tables/main.default.my_online_table" \
  --header "Authorization: Bearer xxx"

# Sample response
{
  "name": "main.default.my_online_table",
  "spec": {
    "run_triggered": {},
    "source_table_full_name": "main.default.source_table",
    "primary_key_columns": ["pk_col"],
    "pipeline_id": "some-pipeline-id"
  },
  "status": {
    "detailed_state": "PROVISIONING",
    "message": "Online Table creation is pending. Check latest status in Delta Live Tables: https://xxx.databricks.com#joblist/pipelines/some-pipeline-id"
  }
}

# Trigger an online table refresh by calling the pipeline API. To discard all existing data
# in the online table before refreshing, set "full_refresh" to "True". This is useful if your
# online table sync is stuck due to, for example, the source table being deleted and recreated
# with the same name while the sync was running.
curl --request POST "https://xxx.databricks.com/api/2.0/pipelines/some-pipeline-id/updates" \
  --header "Authorization: Bearer xxx" \
  --data '{
    "full_refresh": true
  }'

Удаление интерактивной таблицы с помощью API

Пакет SDK Databricks — Python

w.online_tables.delete('main.default.my_online_table')

REST API

curl --request DELETE \
  "https://xxx.databricks.com/api/2.0/online-tables/main.default.my_online_table" \
  --header "Authorization: Bearer xxx"

Удаление интерактивной таблицы останавливает текущую синхронизацию данных и освобождает все свои ресурсы.

Обслуживание данных таблицы в Сети с помощью конечной точки обслуживания функций

Для моделей и приложений, размещенных за пределами Databricks, можно создать конечную точку обслуживания функций для обслуживания функций из веб-таблиц. Конечная точка предоставляет функции с низкой задержкой с помощью REST API.

  1. Создайте спецификацию компонентов.

    При создании спецификации компонентов необходимо указать исходную таблицу Delta. Это позволяет использовать спецификацию функций как в автономном режиме, так и в сетевых сценариях. Для поиска по сети конечная точка обслуживания автоматически использует интерактивную таблицу для выполнения поиска функций с низкой задержкой.

    Исходная разностная таблица и интерактивная таблица должны использовать один и тот же первичный ключ.

    Спецификация функций можно просмотреть на вкладке "Функция " в обозревателе каталогов.

    from databricks.feature_engineering import FeatureEngineeringClient, FeatureLookup
    
    fe = FeatureEngineeringClient()
    fe.create_feature_spec(
      name="catalog.default.user_preferences_spec",
      features=[
        FeatureLookup(
          table_name="user_preferences",
          lookup_key="user_id"
        )
      ]
    )
    
  2. Создайте конечную точку обслуживания компонентов.

    На этом шаге предполагается, что вы создали онлайн-таблицу с именем user_preferences_online_table , которая синхронизирует данные из таблицы user_preferencesDelta. Используйте спецификацию компонентов для создания конечной точки обслуживания компонентов. Конечная точка предоставляет данные через REST API с помощью связанной интерактивной таблицы.

    Примечание.

    Пользователь, выполняющий эту операцию, должен быть владельцем автономной таблицы и интерактивной таблицы.

    Пакет SDK Databricks — Python

    from databricks.sdk import WorkspaceClient
    from databricks.sdk.service.serving import EndpointCoreConfigInput, ServedEntityInput
    
    workspace = WorkspaceClient()
    
    # Create endpoint
    endpoint_name = "fse-location"
    
    workspace.serving_endpoints.create_and_wait(
      name=endpoint_name,
      config=EndpointCoreConfigInput(
        served_entities=[
          ServedEntityInput(
            entity_name=feature_spec_name,
            scale_to_zero_enabled=True,
            workload_size="Small"
          )
        ]
      )
    )
    

    API Python

    from databricks.feature_engineering.entities.feature_serving_endpoint import (
      ServedEntity,
      EndpointCoreConfig,
    )
    
    fe.create_feature_serving_endpoint(
      name="user-preferences",
      config=EndpointCoreConfig(
        served_entities=ServedEntity(
          feature_spec_name="catalog.default.user_preferences_spec",
          workload_size="Small",
          scale_to_zero_enabled=True
        )
      )
    )
    
  3. Получение данных из конечной точки обслуживания компонентов.

    Чтобы получить доступ к конечной точке API, отправьте HTTP-запрос GET на URL-адрес конечной точки. В примере показано, как это сделать с помощью API Python. Сведения о других языках и средствах см. в разделе "Обслуживание компонентов".

    # Set up credentials
    export DATABRICKS_TOKEN=...
    
    url = "https://{workspace_url}/serving-endpoints/user-preferences/invocations"
    
    headers = {'Authorization': f'Bearer {DATABRICKS_TOKEN}', 'Content-Type': 'application/json'}
    
    data = {
      "dataframe_records": [{"user_id": user_id}]
    }
    data_json = json.dumps(data, allow_nan=True)
    
    response = requests.request(method='POST', headers=headers, url=url, data=data_json)
    if response.status_code != 200:
      raise Exception(f'Request failed with status {response.status_code}, {response.text}')
    
    print(response.json()['outputs'][0]['hotel_preference'])
    

Использование онлайн-таблиц с приложениями RAG

Приложения RAG — это распространенный вариант использования для онлайн-таблиц. Вы создаете таблицу в сети для структурированных данных, необходимых приложению RAG, и размещаете ее в конечной точке обслуживания компонентов. Приложение RAG использует конечную точку обслуживания функций для поиска соответствующих данных из интерактивной таблицы.

Типичные шаги приведены ниже.

  1. Создайте конечную точку обслуживания компонентов.
  2. Создайте средство с помощью LangChain или любого аналогичного пакета, использующего конечную точку для поиска соответствующих данных.
  3. Используйте средство в агенте LangChain или аналогичном агенте для получения соответствующих данных.
  4. Создайте конечную точку обслуживания модели для размещения приложения.

Пошаговые инструкции и пример записной книжки см . в примере проектирования компонентов: структурированное приложение RAG.

Примеры записных книжек

В следующей записной книжке показано, как публиковать функции в онлайн-таблицах для обслуживания в режиме реального времени и автоматического поиска функций.

Демонстрационная записная книжка для таблиц в Интернете

Получить записную книжку

Использование онлайн-таблиц с мозаичной моделью ИИ

Вы можете использовать онлайн-таблицы для поиска функций для обслуживания модели ИИ Мозаики. При синхронизации таблицы компонентов с онлайн-таблицей модели обучены с помощью функций из этой таблицы функций автоматически искать значения признаков из интерактивной таблицы во время вывода. Дополнительная настройка не требуется.

  1. Используйте для FeatureLookup обучения модели.

    Для обучения модели используйте функции из автономной таблицы функций в наборе обучения модели, как показано в следующем примере:

    training_set = fe.create_training_set(
      df=id_rt_feature_labels,
      label='quality',
      feature_lookups=[
          FeatureLookup(
              table_name="user_preferences",
              lookup_key="user_id"
          )
      ],
      exclude_columns=['user_id'],
    )
    
  2. Служите моделью с помощью службы модели ИИ Мозаики. Модель автоматически ищет функции из интерактивной таблицы. Дополнительные сведения см. в статье "Автоматическое поиск функций" с помощью Databricks Model Services .

Разрешения пользователя

Для создания интерактивной таблицы необходимо иметь следующие разрешения:

  • SELECT привилегии в исходной таблице.
  • USE_CATALOG привилегии в целевом каталоге.
  • USE_SCHEMA и CREATE_TABLE привилегии для конечной схемы.

Чтобы управлять конвейером синхронизации данных в сети, необходимо либо быть владельцем интерактивной таблицы, либо предоставить привилегию REFRESH в интерактивной таблице. Пользователи, у которых нет USE_CATALOG и USE_SCHEMA привилегий в каталоге, не увидят таблицу в интернете в обозревателе каталогов.

Хранилище метаданных каталога Unity должно иметь модель привилегий версии 1.0.

Модель разрешений конечной точки

Уникальный субъект-служба автоматически создается для службы компонентов или конечной точки обслуживания модели с ограниченными разрешениями, необходимыми для запроса данных из онлайн-таблиц. Этот субъект-служба позволяет конечным точкам получать доступ к данным независимо от пользователя, создавшего ресурс, и гарантирует, что конечная точка может продолжать функционировать, если создатель покидает рабочую область.

Время существования этого субъекта-службы — это время существования конечной точки. Журналы аудита могут указывать системные записи для владельца каталога каталога Unity, предоставляющего этому субъекту-службе необходимые привилегии.

Ограничения

  • Для каждой исходной таблицы поддерживается только одна онлайн-таблица.
  • В оперативной таблице и исходной таблице может быть не более 1000 столбцов.
  • Столбцы типов данных ARRAY, MAP или STRUCT нельзя использовать в качестве первичных ключей в интерактивной таблице.
  • Если столбец используется в качестве первичного ключа в интерактивной таблице, все строки в исходной таблице, где столбец содержит значения NULL, игнорируются.
  • Внешние, системные и внутренние таблицы не поддерживаются в качестве исходных таблиц.
  • Исходные таблицы без веб-канала изменений разностных изменений поддерживают только режим синхронизации моментальных снимков .
  • Таблицы разностного общего доступа поддерживаются только в режиме синхронизации моментальных снимков .
  • Имена каталогов, схем и таблиц в интерактивной таблице могут содержать только буквенно-цифровые символы и символы подчеркивания и не должны начинаться с чисел. Дефисы (-) не допускаются.
  • Столбцы типа String ограничены длиной 64 КБ.
  • Имена столбцов ограничены 64 символами в длину.
  • Максимальный размер строки составляет 2 МБ.
  • Объединенный размер всех интерактивных таблиц в хранилище метаданных каталога Unity во время общедоступной предварительной версии составляет 2 ТБ несжатых пользовательских данных.
  • Максимальное количество запросов в секунду (QPS) составляет 12 000. Обратитесь к группе учетной записи Databricks, чтобы увеличить ограничение.

Устранение неполадок

Не отображается параметр "Создать онлайн-таблицу"

Причина обычно заключается в том, что таблица, которую вы пытаетесь синхронизировать из (исходной таблицы) не поддерживается. Убедитесь, что тип защищенной таблицы исходной таблицы (показан на вкладке сведений обозревателя каталогов) является одним из поддерживаемых вариантов ниже.

  • TABLE_EXTERNAL
  • TABLE_DELTA
  • TABLE_DELTA_EXTERNAL
  • TABLE_DELTASHARING
  • TABLE_DELTASHARING_MUTABLE
  • TABLE_STREAMING_LIVE_TABLE
  • TABLE_STANDARD
  • TABLE_FEATURE_STORE
  • TABLE_FEATURE_STORE_EXTERNAL
  • TABLE_VIEW
  • TABLE_VIEW_DELTASHARING
  • TABLE_MATERIALIZED_VIEW

Не удается выбрать режимы активации или непрерывной синхронизации при создании интерактивной таблицы

Это происходит, если исходная таблица не включает канал данных изменений Delta или если он является представлением или материализованным представлением. Чтобы использовать режим добавочной синхронизации, включите веб-канал изменений в исходной таблице или используйте таблицу без представления.

Обновление таблицы в Сети завершается сбоем или состояние отображается в автономном режиме

Чтобы начать устранение этой ошибки, щелкните идентификатор конвейера, который отображается на вкладке "Обзор " в интерактивной таблице в обозревателе каталогов.

Сбой конвейера веб-таблиц

На отображаемой странице пользовательского интерфейса конвейера щелкните запись, которая говорит "Не удалось разрешить поток "__online_table".

Сообщение об ошибке конвейера веб-таблиц

Появится всплывающее окно с подробными сведениями в разделе сведений об ошибке.

Сведения об ошибке в онлайн-таблицах

Ниже приведены распространенные причины ошибок.

  • Исходная таблица была удалена или удалена и повторно создана с тем же именем, в то время как интерактивная таблица синхронизирована. Это особенно распространено с непрерывными онлайн-таблицами, так как они постоянно синхронизируются.

  • Доступ к исходной таблице невозможен с помощью бессерверных вычислений из-за параметров брандмауэра. В этом случае в разделе сведений об ошибке может появиться сообщение об ошибке "Не удалось запустить службу DLT в кластере xxx...".

  • Совокупный размер онлайн-таблиц превышает ограничение на 2 ТБ (несжатый размер) в хранилище метаданных. Ограничение на 2 ТБ ссылается на несжатый размер после расширения таблицы Delta в формате, ориентированном на строки. Размер таблицы в формате строки может быть значительно больше, чем размер таблицы Delta, показанной в обозревателе каталогов, которая ссылается на сжатый размер таблицы в формате, ориентированном на столбцы. Разница может превышать 100x в зависимости от содержимого таблицы.

    Чтобы оценить несжатый размер разностной таблицы, используйте следующий запрос из бессерверного хранилища SQL. Запрос возвращает предполагаемый размер развернутой таблицы в байтах. Успешное выполнение этого запроса также подтверждает, что бессерверные вычисления могут получить доступ к исходной таблице.

    SELECT sum(length(to_csv(struct(*)))) FROM `source_table`;