Gegevens laden met mozaïekstreaming
In dit artikel wordt beschreven hoe u Mosaic Streaming gebruikt om gegevens van Apache Spark te converteren naar een indeling die compatibel is met PyTorch.
Mosaic Streaming is een opensource-bibliotheek voor het laden van gegevens. Het maakt één knooppunt of gedistribueerde training en evaluatie mogelijk van deep learning-modellen uit gegevenssets die al zijn geladen als Apache Spark DataFrames. Mosaic Streaming ondersteunt voornamelijk Mosaic Composer, maar kan ook worden geïntegreerd met systeemeigen PyTorch, PyTorch Lightning en de TorchDistributor. Mosaic Streaming biedt een reeks voordelen ten opzichte van traditionele PyTorch DataLoaders, waaronder:
- Compatibiliteit met elk gegevenstype, inclusief afbeeldingen, tekst, video en multimodale gegevens.
- Ondersteuning voor grote cloudopslagproviders (AWS, OCI, GCS, Azure, Databricks UC Volume en alle S3-compatibele objectopslag, zoals Cloudflare R2, Coreweave, Backblaze b2, enzovoort)
- Het maximaliseren van juistheidsgaranties, prestaties, flexibiliteit en gebruiksgemak. Bekijk de pagina met belangrijke functies voor meer informatie.
Bekijk de documentatie over de streaming-API voor algemene informatie over Mosaic Streaming.
Notitie
Mosaic Streaming is vooraf geïnstalleerd in alle versies van de Databricks Runtime 15.2 ML en hoger.
Gegevens laden uit Spark DataFrames met mozaïekstreaming
Mosaic Streaming biedt een eenvoudige werkstroom voor het converteren van Apache Spark naar de MDS-indeling (Mosaic Data Shard), die vervolgens kan worden geladen voor gebruik in een gedistribueerde omgeving.
De aanbevolen werkstroom is:
- Gebruik Apache Spark om gegevens te laden en eventueel vooraf te verwerken.
- Gebruik
streaming.base.converters.dataframe_to_mds
dit diagram om het dataframe op schijf op te slaan voor tijdelijke opslag en/of naar een Unity Catalog-volume voor permanente opslag. Deze gegevens worden opgeslagen in de MDS-indeling en kunnen verder worden geoptimaliseerd met ondersteuning voor compressie en hashing. Geavanceerde gebruiksvoorbeelden kunnen ook het voorverwerken van gegevens omvatten met behulp van UDF's. Bekijk de zelfstudie spark DataFrame naar MDS voor meer informatie. - Gebruik
streaming.StreamingDataset
deze functie om de benodigde gegevens in het geheugen te laden.StreamingDataset
is een versie van PyTorch's IterableDataset die elastisch deterministische shuffling biedt, waardoor snel mid-epoch-hervatting mogelijk is. Bekijk de StreamingDataset-documentatie voor meer informatie. - Gebruik
streaming.StreamingDataLoader
dit om de benodigde gegevens te laden voor training/evaluatie/testen.StreamingDataLoader
is een versie van PyTorch's DataLoader die een extra controlepunt/hervattingsinterface biedt, waarvoor het het aantal voorbeelden bijhoudt dat door het model in deze rang wordt gezien.
Zie het volgende notebook voor een end-to-end-voorbeeld:
Het laden van gegevens van Spark naar PyTorch vereenvoudigen met behulp van Een Mozaïek Streaming-notebook
Probleemoplossing: Authenticatiefout
Als u de volgende fout ziet bij het laden van gegevens van een Unity Catalog-volume met behulp van StreamingDataset
, stelt u de omgevingsvariabelen in zoals hieronder wordt weergegeven.
ValueError: default auth: cannot configure default credentials, please check https://docs.databricks.com/en/dev-tools/auth.html#databricks-client-unified-authentication to configure credentials for your preferred authentication method.
Notitie
Als u deze fout ziet bij het uitvoeren van gedistribueerde training met behulp van TorchDistributor
, moet u ook de omgevingsvariabelen instellen op de werkknooppunten.
db_host = "https://your-databricks-host.databricks.com"
db_token = "YOUR API TOKEN" # Create a token with either method from https://docs.databricks.com/en/dev-tools/auth/index.html#databricks-authentication-methods
def your_training_function():
import os
os.environ['DATABRICKS_HOST'] = db_host
os.environ['DATABRICKS_TOKEN'] = db_token
# The above function can be distributed with TorchDistributor:
# from pyspark.ml.torch.distributor import TorchDistributor
# distributor = TorchDistributor(...)
# distributor.run(your_training_function)