使用 Mosaic 串流載入資料
本文說明如何使用 Mosaic 串流將資料從 Apache Spark 轉換成與 PyTorch 相容的格式。
Mosaic 串流是開放原始碼資料載入程式庫。 可讓您從已作為 Apache Spark DataFrame 載入的資料集對深度學習模型進行單一節點或分散式訓練和評估。 Mosaic 串流主要支援 Mosaic Composer,但也與原生 PyTorch、PyTorch Lightning 和 TorchDistributor 整合。 相較於傳統的 PyTorch DataLoader,Mosaic 串流可提供一系列優勢,包括:
- 與任何資料類型的相容性,包括影像、文字、影片和多模式資料。
- 支援主要雲端儲存 providers(AWS、OCI、GCS、Azure、Databricks UC Volume,以及任何 S3 相容物件存放區,例如 Cloudflare R2、Coreweave、Backblaze b2 等)
- 將正確性保證、效能、彈性和易用性最大化。 如需詳細資訊,請檢視其主要功能頁面。
如需 Mosaic 串流的一般資訊,請檢視串流 API 文件。
注意
Mosaic 串流已預先安裝到 Databricks Runtime 15.2 ML 和更新版本。
使用 Mosaic 串流從 Spark DataFrame 載入資料
Mosaic 串流提供從 Apache Spark 轉換成 Mosaic 資料分區 (MDS) 格式的直接工作流程,然後載入以用於分散式環境。
建議的工作流程為:
- 使用 Apache Spark 載入並選擇性地預先處理資料。
- 使用
streaming.base.converters.dataframe_to_mds
將資料框儲存到磁碟以進行短暫儲存,及/或儲存至 Unity Catalog 磁碟區以進行長期儲存。 此資料會以 MDS 格式儲存,並可透過支援壓縮和雜湊進一步最佳化。 進階使用案例也可以包含使用 UDF 的前置處理資料。 如需詳細資訊,請檢視 Spark DataFrame 至 MDS 教學課程。 - 使用
streaming.StreamingDataset
將必要的資料載入記憶體。StreamingDataset
是 PyTorch 的 IterableDataset 版本,其具有彈性確定性的隨機顯示功能,可實現快速的中期恢復。 如需詳細資訊,請參閱 StreamingDataset 文件。 - 使用
streaming.StreamingDataLoader
載入進行訓練/評估/測試所需的資料。StreamingDataLoader
是 PyTorch 的 DataLoader 版本,可提供額外的檢查點/繼續介面,它會追蹤此排名中模型所見的樣本數目。
如需端對端範例,請參閱下列筆記本:
使用 Mosaic 串流筆記本簡化從 Spark 至 PyTorch 的資料載入
疑難解答:驗證錯誤
如果您在使用 StreamingDataset
從 Unity Catalog 磁碟區載入資料時看到下列錯誤,set 環境變數,如下所示。
ValueError: default auth: cannot configure default credentials, please check https://docs.databricks.com/en/dev-tools/auth.html#databricks-client-unified-authentication to configure credentials for your preferred authentication method.
注意
如果您在使用 TorchDistributor
執行分散式訓練時看到此錯誤,您也必須設定 set 工作節點上的環境變數。
db_host = "https://your-databricks-host.databricks.com"
db_token = "YOUR API TOKEN" # Create a token with either method from https://docs.databricks.com/en/dev-tools/auth/index.html#databricks-authentication-methods
def your_training_function():
import os
os.environ['DATABRICKS_HOST'] = db_host
os.environ['DATABRICKS_TOKEN'] = db_token
# The above function can be distributed with TorchDistributor:
# from pyspark.ml.torch.distributor import TorchDistributor
# distributor = TorchDistributor(...)
# distributor.run(your_training_function)