
使用 Mosaic 串流載入資料

本文說明如何使用 Mosaic 串流將資料從 Apache Spark 轉換成與 PyTorch 相容的格式。

Mosaic 串流是開放原始碼資料載入程式庫。 可讓您從已作為 Apache Spark DataFrame 載入的資料集對深度學習模型進行單一節點或分散式訓練和評估。 Mosaic 串流主要支援 Mosaic Composer,但也與原生 PyTorch、PyTorch Lightning 和 TorchDistributor 整合。 相較於傳統的 PyTorch DataLoader,Mosaic 串流可提供一系列優勢,包括:

  • 與任何資料類型的相容性,包括影像、文字、影片和多模式資料。
  • 支援主要雲端儲存 providers(AWS、OCI、GCS、Azure、Databricks UC Volume,以及任何 S3 相容物件存放區,例如 Cloudflare R2、Coreweave、Backblaze b2 等)
  • 將正確性保證、效能、彈性和易用性最大化。 如需詳細資訊,請檢視其主要功能頁面。

如需 Mosaic 串流的一般資訊,請檢視串流 API 文件


Mosaic 串流已預先安裝到 Databricks Runtime 15.2 ML 和更新版本。

使用 Mosaic 串流從 Spark DataFrame 載入資料

Mosaic 串流提供從 Apache Spark 轉換成 Mosaic 資料分區 (MDS) 格式的直接工作流程,然後載入以用於分散式環境。


  1. 使用 Apache Spark 載入並選擇性地預先處理資料。
  2. 使用 streaming.base.converters.dataframe_to_mds 將資料框儲存到磁碟以進行短暫儲存,及/或儲存至 Unity Catalog 磁碟區以進行長期儲存。 此資料會以 MDS 格式儲存,並可透過支援壓縮和雜湊進一步最佳化。 進階使用案例也可以包含使用 UDF 的前置處理資料。 如需詳細資訊,請檢視 Spark DataFrame 至 MDS 教學課程
  3. 使用 streaming.StreamingDataset 將必要的資料載入記憶體。 StreamingDataset 是 PyTorch 的 IterableDataset 版本,其具有彈性確定性的隨機顯示功能,可實現快速的中期恢復。 如需詳細資訊,請參閱 StreamingDataset 文件
  4. 使用 streaming.StreamingDataLoader 載入進行訓練/評估/測試所需的資料。 StreamingDataLoader 是 PyTorch 的 DataLoader 版本,可提供額外的檢查點/繼續介面,它會追蹤此排名中模型所見的樣本數目。


使用 Mosaic 串流筆記本簡化從 Spark 至 PyTorch 的資料載入

Get 筆記本


如果您在使用 StreamingDataset從 Unity Catalog 磁碟區載入資料時看到下列錯誤,set 環境變數,如下所示。

ValueError: default auth: cannot configure default credentials, please check https://docs.databricks.com/en/dev-tools/auth.html#databricks-client-unified-authentication to configure credentials for your preferred authentication method.


如果您在使用 TorchDistributor執行分散式訓練時看到此錯誤,您也必須設定 set 工作節點上的環境變數。

db_host = "https://your-databricks-host.databricks.com"
db_token = "YOUR API TOKEN" # Create a token with either method from https://docs.databricks.com/en/dev-tools/auth/index.html#databricks-authentication-methods

def your_training_function():
  import os
  os.environ['DATABRICKS_HOST'] = db_host
  os.environ['DATABRICKS_TOKEN'] = db_token

# The above function can be distributed with TorchDistributor:
# from pyspark.ml.torch.distributor import TorchDistributor
# distributor = TorchDistributor(...)
# distributor.run(your_training_function)