共用方式為


使用在線 tables 進行即時功能服務

重要

在線 tables 在下列區域中處於公開預覽狀態:westuseastuseastus2northeuropewesteurope。 如需定價資訊,請參閱 Online Tables 定價

在線 table 是 Delta Table 的唯讀複本,儲存在針對在線存取優化的數據列導向格式中。 線上 tables 是完全無伺服器的 tables,隨著請求負載自動調整吞吐量,並提供低延遲和高吞吐量來存取任何規模的資料。 線上 tables 旨在與 Mosaic AI 模型服務、特徵服務及檢索增強生成(RAG)應用程式搭配使用,where 用於快速資料查詢。

您也可以在查詢中使用在線 tables,並使用 Lakehouse Federation。 使用 Lakehouse 聯盟時,您必須使用無伺服器 SQL 資料倉庫來存取線上 tables。 僅支援讀取作業 (SELECT)。 此功能僅用於互動或偵錯用途,不應用於生產或任務關鍵性工作負載。

使用 Databricks UI 建立線上 table 是一步驟的過程。 只要在 Catalog Explorer 中 select Delta table,select建立在線 table。 您也可以使用 REST API 或 Databricks SDK 來建立和管理在線 tables。 請參閱 使用 API進行線上 tables 的工作。

需求

  • 必須啟用 Unity Catalog的工作區。 請遵循 文件,建立 Unity Catalog Metastore,在工作區中啟用它,然後建立 Catalog。
  • 必須在 Unity Catalog 中註冊一個模型,才能訪問線上 tables。

使用UI操作線上 tables

本節說明如何建立和刪除在線 tables,以及如何檢查在線 tables的狀態和觸發更新。

使用UI建立線上 table

您可以使用 Catalog Explorer 建立在線 table。 如需有關必要權限的資訊,請參閱使用者權限

  1. 若要建立線上 table,來源 Delta table 必須具有主鍵。 如果您想要使用的 Delta table 沒有主鍵,請依照下列指示建立一個:在 Unity Catalog 中使用現有的 Delta table 做為功能 table

  2. 在 [Catalog 總管] 中,流覽至您想要 sync 至在線 table的來源 table。 從 [建立] 功能表,選擇 select線上 table

    select 在線建立 table

  3. 使用對話框中的選取器來設定線上 table。

    設定在線 table 對話框

    名稱:在 Unity Catalog中用於在線 table 的名稱。

    主鍵:使用來源 table 中的 Column作為線上 table的主鍵。

    時間序索引鍵:(選用)。 來源中的 Columntable 作為時間序列鍵使用。 指定時,線上 table 僅包含每個主鍵所對應具有最新時間序列鍵值的數據行。

    Sync 模式:指定同步管線如何更新線上 table。 Select 快照觸發連續之一。

    原則 描述
    Snapshot 管線會執行一次,以擷取來源 table 的快照,並將其複製到線上 table。 來源 table 的後續變更會自動反映在在線上的 table,方法是擷取來源的新快照並建立新的複本。 在線 table 的內容會以原子性方式更新。
    觸發 流程會執行一次,以在線上 table中建立來源 table 的初始快照集複本。 不同於快照集 sync 模式,當重新整理在線 table 時,只會擷取上次管線執行后所做的變更,並套用至在線 table。 累加式 refresh 可以根據排程手動觸發或自動觸發。
    連續 管線持續執行。 來源 table 的後續變更會以逐步方式即時套用至線上 table 串流模式。 不需要手冊 refresh。

注意

若要支援 觸發連續sync 模式,來源 table 必須啟用 變更資料源

  1. 完成時,按一下 [確認]。 線上 table 頁面隨即出現。
  2. 新的線上 table 會在建立對話框中指定的 catalog、schema和名稱中建立。 在 [Catalog 總管] 中,table 的在線狀態由 table 圖示表示。

使用UI Get 狀態和觸發更新

若要檢查在線 table的狀態,請按一下 Catalog 中的 table 名稱以開啟它。 在線 table 頁面隨即出現,並開啟 概觀和 索引標籤。 資料匯入 區段會顯示最新 update的狀態。 若要觸發 update,請按下 Sync 現在資料內嵌 區段也包含更新 table之 Delta Live Tables 管線的連結。

中在線 頁面的 檢視

排程定期更新

您可以為使用 快照集觸發sync 模式的線上 tables 設定自動定期更新的排程。 Delta Live Tables 管線會更新 table並負責管理 update 排程。

  1. 在 [Catalog 總管] 中,導航至線上 table。
  2. 在 [ 數據內嵌 ] 區段中,按兩下管線的連結。
  3. 在右上角,點擊 [排程],然後新增一個排程或 update 現有的排程。

使用UI刪除線上 table

從在線 table 頁面,selectKebab 功能表 kebab 功能表刪除

使用 API 處理線上 tables

您也可以使用 Databricks SDK 或 REST API 來建立和管理在線 tables。

如需參考資訊,請參閱適用於 Python 的 Databricks SDKREST API 的參考文件。

需求

Databricks SDK 0.20 版或更新版本。

使用 API 建立線上 table

Databricks SDK - Python

from pprint import pprint
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import *

w = WorkspaceClient(host='https://xxx.databricks.com', token='xxx')

# Create an online table
spec = OnlineTableSpec(
  primary_key_columns=["pk_col"],
  source_table_full_name="main.default.source_table",
  run_triggered=OnlineTableSpecTriggeredSchedulingPolicy.from_dict({'triggered': 'true'})
)

online_table = OnlineTable(
  name="main.default.my_online_table",  # Fully qualified table name
  spec=spec  # Online table specification
)

w.online_tables.create_and_wait(table=online_table)

REST API

curl --request POST "https://xxx.databricks.com/api/2.0/online-tables" \
--header "Authorization: Bearer xxx" \
--data '{
    "name": "main.default.my_online_table",
    "spec": {
        "run_triggered": {},
        "source_table_full_name": "main.default.source_table",
        "primary_key_columns": ["a"]
    }
  }'

在線 table 會在建立後自動開始同步處理。

使用 API 控制狀態 Get 並觸發 refresh

您可以遵循下列範例,檢視在線 table 的狀態和規格。 如果您的在線 table 不是連續的,而且您想要觸發其數據的手動 refresh,您可以使用管線 API 來執行此動作。

在線上 table 規範中使用與線上 table 相關聯的管線標識碼,並在管線上啟動新的 update 來觸發 refresh。 這相當於在 [Catalog 瀏覽器] 的在線 [table UI] 中點擊現在的 [Sync]。

Databricks SDK - Python

pprint(w.online_tables.get('main.default.my_online_table'))

# Sample response
OnlineTable(name='main.default.my_online_table',
    spec=OnlineTableSpec(perform_full_copy=None,
        pipeline_id='some-pipeline-id',
        primary_key_columns=['pk_col'],
        run_continuously=None,
        run_triggered={},
        source_table_full_name='main.default.source_table',
        timeseries_key=None),
    status=OnlineTableStatus(continuous_update_status=None,
        detailed_state=OnlineTableState.PROVISIONING,
        failed_status=None,
        message='Online Table creation is '
            'pending. Check latest status in '
            'Delta Live Tables: '
            'https://xxx.databricks.com/pipelines/some-pipeline-id',
        provisioning_status=None,
        triggered_update_status=None))

# Trigger an online table refresh by calling the pipeline API. To discard all existing data
# in the online table before refreshing, set "full_refresh" to "True". This is useful if your
# online table sync is stuck due to, for example, the source table being deleted and recreated
# with the same name while the sync was running.
w.pipelines.start_update(pipeline_id='some-pipeline-id', full_refresh=True)

REST API

curl --request GET \
  "https://xxx.databricks.com/api/2.0/online-tables/main.default.my_online_table" \
  --header "Authorization: Bearer xxx"

# Sample response
{
  "name": "main.default.my_online_table",
  "spec": {
    "run_triggered": {},
    "source_table_full_name": "main.default.source_table",
    "primary_key_columns": ["pk_col"],
    "pipeline_id": "some-pipeline-id"
  },
  "status": {
    "detailed_state": "PROVISIONING",
    "message": "Online Table creation is pending. Check latest status in Delta Live Tables: https://xxx.databricks.com#joblist/pipelines/some-pipeline-id"
  }
}

# Trigger an online table refresh by calling the pipeline API. To discard all existing data
# in the online table before refreshing, set "full_refresh" to "True". This is useful if your
# online table sync is stuck due to, for example, the source table being deleted and recreated
# with the same name while the sync was running.
curl --request POST "https://xxx.databricks.com/api/2.0/pipelines/some-pipeline-id/updates" \
  --header "Authorization: Bearer xxx" \
  --data '{
    "full_refresh": true
  }'

使用 API 來刪除線上的 table

Databricks SDK - Python

w.online_tables.delete('main.default.my_online_table')

REST API

curl --request DELETE \
  "https://xxx.databricks.com/api/2.0/online-tables/main.default.my_online_table" \
  --header "Authorization: Bearer xxx"

刪除線上 table 會停止任何進行中的資料同步,並釋放其所有資源。

使用功能服務端點提供在線 table 數據

針對裝載於 Databricks 外部的模型和應用程式,您可以建立功能服務端點,以從在線 tables提供功能。 端點使用 REST API 以低延遲提供特徵。

  1. 建立特徵規格。

    當您建立功能規格時,您會指定來源 Delta table。 這允許同時在離線和線上案例中使用特徵規格。 針對線上查詢,服務端點會自動使用線上 table 來執行低延遲特徵查詢。

    來源 Delta table 和線上 table 必須使用相同的主鍵。

    功能規格可以在 Catalog Explorer 的 [函式] 索引標籤中檢視。

    from databricks.feature_engineering import FeatureEngineeringClient, FeatureLookup
    
    fe = FeatureEngineeringClient()
    fe.create_feature_spec(
      name="catalog.default.user_preferences_spec",
      features=[
        FeatureLookup(
          table_name="user_preferences",
          lookup_key="user_id"
        )
      ]
    )
    
  2. 建立特徵服務端點。

    此步驟假設您已建立名為 user_preferences_online_table 的線上 table,用來同步 Delta tableuser_preferences中的數據。 使用特徵規格來建立特徵服務端點。 端點會透過相關的在線 table,使用 REST API 功能提供資料。

    注意

    執行這項作業的用戶必須是離線 table 和在線 table的擁有者。

    Databricks SDK - Python

    from databricks.sdk import WorkspaceClient
    from databricks.sdk.service.serving import EndpointCoreConfigInput, ServedEntityInput
    
    workspace = WorkspaceClient()
    
    # Create endpoint
    endpoint_name = "fse-location"
    
    workspace.serving_endpoints.create_and_wait(
      name=endpoint_name,
      config=EndpointCoreConfigInput(
        served_entities=[
          ServedEntityInput(
            entity_name=feature_spec_name,
            scale_to_zero_enabled=True,
            workload_size="Small"
          )
        ]
      )
    )
    

    Python API

    from databricks.feature_engineering.entities.feature_serving_endpoint import (
      ServedEntity,
      EndpointCoreConfig,
    )
    
    fe.create_feature_serving_endpoint(
      name="user-preferences",
      config=EndpointCoreConfig(
        served_entities=ServedEntity(
          feature_spec_name="catalog.default.user_preferences_spec",
          workload_size="Small",
          scale_to_zero_enabled=True
        )
      )
    )
    
  3. 從功能提供端點獲取的 Get 數據。

    若要存取 API 端點,請將 HTTP GET 要求傳送至端點 URL。 此範例顯示如何使用 Python API 執行此作業。 如需其他語言和工具,請參閱特徵服務

    # Set up credentials
    export DATABRICKS_TOKEN=...
    
    url = "https://{workspace_url}/serving-endpoints/user-preferences/invocations"
    
    headers = {'Authorization': f'Bearer {DATABRICKS_TOKEN}', 'Content-Type': 'application/json'}
    
    data = {
      "dataframe_records": [{"user_id": user_id}]
    }
    data_json = json.dumps(data, allow_nan=True)
    
    response = requests.request(method='POST', headers=headers, url=url, data=data_json)
    if response.status_code != 200:
      raise Exception(f'Request failed with status {response.status_code}, {response.text}')
    
    print(response.json()['outputs'][0]['hotel_preference'])
    

搭配RAG應用程式使用在線 tables

RAG 應用程式是在線 tables的常見使用案例。 您為 RAG 應用程式所需的結構化資料建立一個在線 table,並將其託管在功能服務端點上。 RAG 應用程式會使用提供端點的功能,從在線 table查閱相關數據。

常見步驟如下:

  1. 建立特徵服務端點。
  2. 使用 LangChain 或任何使用端點查詢相關資料的類似套件建立一個工具。
  3. 使用 LangChain 代理程式或類似代理程式中的工具來擷取相關資料。
  4. 建立模型服務端點來託管應用程式。

如需逐步指示和範例筆記本,請參閱特徵工程範例:結構化 RAG 應用程式

筆記本範例

下列筆記本說明如何將特徵發佈到線上 tables,以進行即時服務和自動化特徵查找。

線上 tables 示範筆記本

Get 筆記本

使用 Mosaic AI 模型服務的線上 tables

您可以使用在線 tables 來查閱馬賽克 AI 模型服務的功能。 當您 sync 功能 table 至線上 table時,使用該功能 table 所訓練的模型會在推斷期間自動從線上 table 查詢功能 values。 不需要進行其他組態設定。

  1. 使用 FeatureLookup 訓練模型。

    針對模型訓練,請使用模型訓練 set中的離線功能 table,如下列範例所示:

    training_set = fe.create_training_set(
      df=id_rt_feature_labels,
      label='quality',
      feature_lookups=[
          FeatureLookup(
              table_name="user_preferences",
              lookup_key="user_id"
          )
      ],
      exclude_columns=['user_id'],
    )
    
  2. 使用 Mosaic AI 模型服務為模型提供服務。 此模型會自動從在線 table檢索特徵。 如需詳細資料,請參閱使用 Databricks 模型服務自動查詢特徵

使用者權限

您必須具有下列許可權,才能建立線上 table:

  • 在來源 table上的 SELECT 許可權。
  • catalog目的地上的 USE_CATALOG 許可權。
  • 目的地 schema上的 USE_SCHEMACREATE_TABLE 許可權。

若要管理在線 table的數據同步處理管線,您必須是在線 table 的擁有者,或被授與在線 table的 REFRESH 許可權。 catalog 上沒有USE_CATALOG和USE_SCHEMA許可權的使用者將不會在 Catalog Explorer 中看到在線 table。

Unity Catalog 中繼存放區必須具有 權限模型版本 1.0

端點權限模型

系統會自動為功能提供或模型提供端點建立一個獨特的服務主體,並僅擁有查詢在線數據所需的有限的許可權 tables。 此服務主體允許端點獨立於建立資源的使用者存取資料,並確保當建立者離開工作區時端點可以繼續運作。

此服務主體的存留期是端點的存留期。 稽核記錄可能顯示系統為 Unity 擁有者產生的 Catalogcatalog 記錄,授予此服務主體必要的許可權。

限制

  • 每個來源 table只支援一個在線 table。
  • 在線 table 及其來源 table 最多可以有 1000 columns。
  • Columns 的數據類型 ARRAY、MAP 或 STRUCT 無法在在線 table中作為主鍵使用。
  • 如果 column 作為線上 table的主鍵使用,則會忽略來源 tablewherecolumn 包含 null values 的所有資料列。
  • 不支援將外部、系統和內部 tables 作為來源 tables。
  • 未啟用差異變更數據饋送的來源 tables 僅支援 快照sync 模式。
  • Delta Sharing tables 僅支援 快照sync 模式。
  • Catalog、schema和在線的 table 名稱 table 只能包含字母、數字和底線,且不得以數字開頭。 不允許使用虛線 (-)。
  • 字串類型的 Columns 限制為 64KB 長度。
  • Column 名稱長度限制為64個字元。
  • 資料列大小上限為 2MB。
  • 在公開預覽期間,Unity Catalog metastore 中所有線上 tables 的總大小為 2TB 的未壓縮使用者數據。
  • 每秒查詢數 (QPS) 上限為 12,000。 請連絡 Databricks 帳戶小組,以增加 limit。

疑難排解

我看不到 [在線建立 table] 選項

原因通常是您嘗試進行sync的對象(來源 table)的table不是支援的型別。 請確認來源 table的安全實體種類(如 [Catalog 檢視器] [詳細資料] 索引標籤所示)是下面支持的選項之一。

  • TABLE_EXTERNAL
  • TABLE_DELTA
  • TABLE_DELTA_EXTERNAL
  • TABLE_DELTASHARING
  • TABLE_DELTASHARING_MUTABLE
  • TABLE_STREAMING_LIVE_TABLE
  • TABLE_STANDARD
  • TABLE_FEATURE_STORE
  • TABLE_FEATURE_STORE_EXTERNAL
  • TABLE_VIEW
  • TABLE_VIEW_DELTASHARING
  • TABLE_MATERIALIZED_VIEW

當我建立在線 table 時,我無法 select觸發連續sync 模式

如果來源 table 未啟用 Delta 更動資料提要,或者它是檢視或具體化檢視,就會發生這種情況。 若要使用 累加式sync 模式,請在來源 table上啟用變更資料提要,或使用非檢視 table。

table和update 在線上失敗或狀態顯示為離線

若要開始針對此錯誤進行疑難解答,請按兩下 [ 總管] 中在線 [概觀] 索引卷標中顯示的管線標識碼。

在線 tables 管道故障

在顯示的管線 UI 頁面上,按一下顯示「無法解析流程 ‘__online_table’」的項目。

在線 tables 流程錯誤訊息

快顯視窗隨即出現,在 [錯誤詳細資料] 區段中提供了詳細資料。

在線 tables 錯誤 的詳細數據

錯誤的常見原因包括如下:

  • 來源 table 已刪除,或以相同名稱刪除和重新建立,而在線 table 正在同步處理。 這在持續在線 tables中尤為常見,因為它們會一直同步處理。

  • 來源 table 無法透過無伺服器運算存取,因為防火牆設定。 在這種情況下,[錯誤詳細資料] 區段可能會顯示錯誤訊息「無法啟動叢集 xxx 上的 DLT 服務...」。

  • 在線 tables 的總大小超過整個中繼存放區 limit的 2 TB(未壓縮大小)。 2 TB limit 是指以行導向格式擴充 Delta table 之後未壓縮的大小。 數據列格式的 table 大小可以明顯大於在 Catalog Explorer 中顯示的 Delta table 尺寸,後者是指在 column導向格式下壓縮 table 的大小。 視 table的內容而定,差異可能高達 100x。

    若要估計 Delta table未壓縮的數據列擴充大小,請使用下列來自無伺服器 SQL 倉儲的查詢。 查詢會以位元組為單位傳回估計展開 table 大小。 成功執行此查詢也會確認無伺服器計算可以存取來源 table。

    SELECT sum(length(to_csv(struct(*)))) FROM `source_table`;