共用方式為


使用 Mosaic 串流載入資料

本文說明如何使用 Mosaic 串流將資料從 Apache Spark 轉換成與 PyTorch 相容的格式。

Mosaic 串流是開放原始碼資料載入程式庫。 可讓您從已作為 Apache Spark DataFrame 載入的資料集對深度學習模型進行單一節點或分散式訓練和評估。 Mosaic 串流主要支援 Mosaic Composer,但也與原生 PyTorch、PyTorch Lightning 和 TorchDistributor 整合。 相較於傳統的 PyTorch DataLoader,Mosaic 串流可提供一系列優勢,包括:

  • 與任何資料類型的相容性,包括影像、文字、影片和多模式資料。
  • 支援主要雲端儲存 providers(AWS、OCI、GCS、Azure、Databricks UC Volume,以及任何 S3 相容物件存放區,例如 Cloudflare R2、Coreweave、Backblaze b2 等)
  • 將正確性保證、效能、彈性和易用性最大化。 如需詳細資訊,請檢視其主要功能頁面。

如需 Mosaic 串流的一般資訊,請檢視串流 API 文件

注意

Mosaic 串流已預先安裝到 Databricks Runtime 15.2 ML 和更新版本。

使用 Mosaic 串流從 Spark DataFrame 載入資料

Mosaic 串流提供從 Apache Spark 轉換成 Mosaic 資料分區 (MDS) 格式的直接工作流程,然後載入以用於分散式環境。

建議的工作流程為:

  1. 使用 Apache Spark 載入並選擇性地預先處理資料。
  2. 使用 streaming.base.converters.dataframe_to_mds 將資料框儲存到磁碟以進行短暫儲存,及/或儲存至 Unity Catalog 磁碟區以進行長期儲存。 此資料會以 MDS 格式儲存,並可透過支援壓縮和雜湊進一步最佳化。 進階使用案例也可以包含使用 UDF 的前置處理資料。 如需詳細資訊,請檢視 Spark DataFrame 至 MDS 教學課程
  3. 使用 streaming.StreamingDataset 將必要的資料載入記憶體。 StreamingDataset 是 PyTorch 的 IterableDataset 版本,其具有彈性確定性的隨機顯示功能,可實現快速的中期恢復。 如需詳細資訊,請參閱 StreamingDataset 文件
  4. 使用 streaming.StreamingDataLoader 載入進行訓練/評估/測試所需的資料。 StreamingDataLoader 是 PyTorch 的 DataLoader 版本,可提供額外的檢查點/繼續介面,它會追蹤此排名中模型所見的樣本數目。

如需端對端範例,請參閱下列筆記本:

使用 Mosaic 串流筆記本簡化從 Spark 至 PyTorch 的資料載入

Get 筆記本

疑難解答:驗證錯誤

如果您在使用 StreamingDataset從 Unity Catalog 磁碟區載入資料時看到下列錯誤,set 環境變數,如下所示。

ValueError: default auth: cannot configure default credentials, please check https://docs.databricks.com/en/dev-tools/auth.html#databricks-client-unified-authentication to configure credentials for your preferred authentication method.

注意

如果您在使用 TorchDistributor執行分散式訓練時看到此錯誤,您也必須設定 set 工作節點上的環境變數。

db_host = "https://your-databricks-host.databricks.com"
db_token = "YOUR API TOKEN" # Create a token with either method from https://docs.databricks.com/en/dev-tools/auth/index.html#databricks-authentication-methods

def your_training_function():
  import os
  os.environ['DATABRICKS_HOST'] = db_host
  os.environ['DATABRICKS_TOKEN'] = db_token

# The above function can be distributed with TorchDistributor:
# from pyspark.ml.torch.distributor import TorchDistributor
# distributor = TorchDistributor(...)
# distributor.run(your_training_function)