Delen via


Aangepaste gegevensbronnen van PySpark

Belangrijk

Aangepaste PySpark-gegevensbronnen bevinden zich in Public Preview in Databricks Runtime 15.2 en hoger, en in versie 2 van de serverloze omgeving. Streaming-ondersteuning is beschikbaar in Databricks Runtime 15.3 en hoger.

Een PySpark DataSource wordt gemaakt door de Python DataSource-API (PySpark), waarmee u kunt lezen uit aangepaste gegevensbronnen en schrijven naar aangepaste gegevenssinks in Apache Spark met behulp van Python. U kunt aangepaste gegevensbronnen van PySpark gebruiken om aangepaste verbindingen met gegevenssystemen te definiëren en extra functionaliteit te implementeren om herbruikbare gegevensbronnen uit te bouwen.

Gegevensbronklasse

PySpark DataSource is een basisklasse die methoden biedt voor het maken van gegevenslezers en schrijvers.

De subklasse van de gegevensbron implementeren

Afhankelijk van uw gebruiksscenario moet het volgende worden geïmplementeerd door een subklasse om een gegevensbron leesbaar, beschrijfbaar of beide te maken:

Eigenschap of methode Beschrijving
name Vereist. De naam van de gegevensbron
schema Vereist. Het schema van de gegevensbron die moet worden gelezen of geschreven
reader() Moet een DataSourceReader retourneren om de gegevensbron leesbaar te maken (batch)
writer() Om de gegevenssink schrijfbaar te maken, moet een DataSourceWriter worden geretourneerd (batch).
streamReader() of simpleStreamReader() Moet een DataSourceStreamReader retourneren om de gegevensstroom leesbaar te maken (streaming)
streamWriter() Moet een DataSourceStreamWriter retourneren om de gegevensstroom beschrijfbaar te maken (streaming)

Notitie

De door de gebruiker gedefinieerde DataSource, DataSourceReader, DataSourceWriter, , DataSourceStreamReaderen DataSourceStreamWriterhun methoden moeten kunnen worden geserialiseerd. Met andere woorden, ze moeten een woordenboek of een genest woordenboek zijn die een primitief type bevatten.

De gegevensbron registreren

Nadat u de interface hebt geïmplementeerd, moet u deze registreren en kunt u deze laden of anderszins gebruiken, zoals wordt weergegeven in het volgende voorbeeld:

# Register the data source
spark.dataSource.register(MyDataSourceClass)

# Read from a custom data source
spark.read.format("my_datasource_name").load().show()

Voorbeeld 1: Een PySpark-gegevensbron maken voor batchquery

Als u de mogelijkheden van PySpark DataSource-lezer wilt demonstreren, maakt u een gegevensbron waarmee voorbeeldgegevens worden gegenereerd met behulp van het faker Python-pakket. Raadpleeg de Faker-documentatie voor meer informatie over faker.

Installeer het faker pakket met behulp van de volgende opdracht:

%pip install faker

Stap 1: De voorbeeldgegevensbron definiëren

Definieer eerst uw nieuwe PySpark DataSource als een subklasse van DataSource met een naam, schema en lezer. De reader() methode moet worden gedefinieerd om te lezen uit een gegevensbron in een batchquery.

from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType

class FakeDataSource(DataSource):
    """
    An example data source for batch query using the `faker` library.
    """

    @classmethod
    def name(cls):
        return "fake"

    def schema(self):
        return "name string, date string, zipcode string, state string"

    def reader(self, schema: StructType):
        return FakeDataSourceReader(schema, self.options)

Stap 2: De lezer voor een batchquery implementeren

Implementeer vervolgens de lezerlogica om voorbeeldgegevens te genereren. Gebruik de geïnstalleerde faker-bibliotheek om elk veld in het schema te vullen.

class FakeDataSourceReader(DataSourceReader):

    def __init__(self, schema, options):
        self.schema: StructType = schema
        self.options = options

    def read(self, partition):
        # Library imports must be within the method.
        from faker import Faker
        fake = Faker()

        # Every value in this `self.options` dictionary is a string.
        num_rows = int(self.options.get("numRows", 3))
        for _ in range(num_rows):
            row = []
            for field in self.schema.fields:
                value = getattr(fake, field.name)()
                row.append(value)
            yield tuple(row)

Stap 3: De voorbeeldgegevensbron registreren en gebruiken

Als u de gegevensbron wilt gebruiken, moet u deze registreren. De FakeDataSource heeft standaard drie rijen en het schema bevat de volgende string velden: name, date, zipcode, state. In het volgende voorbeeld wordt de voorbeeldgegevensbron geregistreerd, geladen en uitgevoerd met de standaardinstellingen:

spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
+-----------------+----------+-------+----------+
|             name|      date|zipcode|     state|
+-----------------+----------+-------+----------+
|Christine Sampson|1979-04-24|  79766|  Colorado|
|       Shelby Cox|2011-08-05|  24596|   Florida|
|  Amanda Robinson|2019-01-06|  57395|Washington|
+-----------------+----------+-------+----------+

Alleen string velden worden ondersteund, maar u kunt een schema opgeven met velden die overeenkomen met de velden van faker pakketproviders om willekeurige gegevens te genereren voor testen en ontwikkelen. In het volgende voorbeeld wordt de gegevensbron geladen met name en company velden:

spark.read.format("fake").schema("name string, company string").load().show()
+---------------------+--------------+
|name                 |company       |
+---------------------+--------------+
|Tanner Brennan       |Adams Group   |
|Leslie Maxwell       |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc   |
+---------------------+--------------+

Als u de gegevensbron wilt laden met een aangepast aantal rijen, geeft u de numRows optie op. In het volgende voorbeeld worden vijf rijen opgegeven:

spark.read.format("fake").option("numRows", 5).load().show()
+--------------+----------+-------+------------+
|          name|      date|zipcode|       state|
+--------------+----------+-------+------------+
|  Pam Mitchell|1988-10-20|  23788|   Tennessee|
|Melissa Turner|1996-06-14|  30851|      Nevada|
|  Brian Ramsey|2021-08-21|  55277|  Washington|
|  Caitlin Reed|1983-06-22|  89813|Pennsylvania|
| Douglas James|2007-01-18|  46226|     Alabama|
+--------------+----------+-------+------------+

Voorbeeld 2: PySpark DataSource maken voor het streamen van lezen en schrijven

Maak een voorbeeldgegevensbron waarmee twee rijen in elke microbatch worden gegenereerd met behulp van het faker Python-pakket om pySpark DataSource-streamlezer- en schrijfmogelijkheden te demonstreren. Voor meer informatie over faker, zie de Faker-documentatie.

Installeer het faker pakket met behulp van de volgende opdracht:

%pip install faker

Stap 1: De voorbeeldgegevensbron definiëren

Definieer eerst uw nieuwe PySpark DataSource als een subklasse van DataSource met een naam, schema en methoden streamReader() en streamWriter().

from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter
from pyspark.sql.types import StructType

class FakeStreamDataSource(DataSource):
    """
    An example data source for streaming read and write using the `faker` library.
    """

    @classmethod
    def name(cls):
        return "fakestream"

    def schema(self):
        return "name string, state string"

    def streamReader(self, schema: StructType):
        return FakeStreamReader(schema, self.options)

    # If you don't need partitioning, you can implement the simpleStreamReader method instead of streamReader.
    # def simpleStreamReader(self, schema: StructType):
    #    return SimpleStreamReader()

    def streamWriter(self, schema: StructType, overwrite: bool):
        return FakeStreamWriter(self.options)

Stap 2: De streamlezer implementeren

Implementeer vervolgens de voorbeeldlezer voor streaminggegevens waarmee twee rijen in elke microbatch worden gegenereerd. U kunt in plaats daarvan implementeren DataSourceStreamReaderof als de gegevensbron een lage doorvoer heeft en geen partitionering vereist, kunt u in plaats daarvan implementeren SimpleDataSourceStreamReader . simpleStreamReader() of streamReader() moet worden geïmplementeerd en simpleStreamReader() wordt alleen aangeroepen wanneer streamReader() niet is geïmplementeerd.

DataSourceStreamReader-implementatie

Het streamReader exemplaar heeft een gehele offset die met 2 toeneemt in elke microbatch, geïmplementeerd via de DataSourceStreamReader interface.

from pyspark.sql.datasource import InputPartition
from typing import Iterator, Tuple
import os
import json

class RangePartition(InputPartition):
    def __init__(self, start, end):
        self.start = start
        self.end = end

class FakeStreamReader(DataSourceStreamReader):
    def __init__(self, schema, options):
        self.current = 0

    def initialOffset(self) -> dict:
        """
        Returns the initial start offset of the reader.
        """
        return {"offset": 0}

    def latestOffset(self) -> dict:
        """
        Returns the current latest offset that the next microbatch will read to.
        """
        self.current += 2
        return {"offset": self.current}

    def partitions(self, start: dict, end: dict):
        """
        Plans the partitioning of the current microbatch defined by start and end offset. It
        needs to return a sequence of :class:`InputPartition` objects.
        """
        return [RangePartition(start["offset"], end["offset"])]

    def commit(self, end: dict):
        """
        This is invoked when the query has finished processing data before end offset. This
        can be used to clean up the resource.
        """
        pass

    def read(self, partition) -> Iterator[Tuple]:
        """
        Takes a partition as an input and reads an iterator of tuples from the data source.
        """
        start, end = partition.start, partition.end
        for i in range(start, end):
            yield (i, str(i))

Implementatie van SimpleDataSourceStreamReader

Het SimpleStreamReader exemplaar is hetzelfde als het FakeStreamReader exemplaar dat twee rijen in elke batch genereert, maar geïmplementeerd met de SimpleDataSourceStreamReader interface zonder partitionering.

class SimpleStreamReader(SimpleDataSourceStreamReader):
    def initialOffset(self):
        """
        Returns the initial start offset of the reader.
        """
        return {"offset": 0}

    def read(self, start: dict) -> (Iterator[Tuple], dict):
        """
        Takes start offset as an input, then returns an iterator of tuples and the start offset of the next read.
        """
        start_idx = start["offset"]
        it = iter([(i,) for i in range(start_idx, start_idx + 2)])
        return (it, {"offset": start_idx + 2})

    def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
        """
        Takes start and end offset as inputs, then reads an iterator of data deterministically.
        This is called when the query replays batches during restart or after a failure.
        """
        start_idx = start["offset"]
        end_idx = end["offset"]
        return iter([(i,) for i in range(start_idx, end_idx)])

    def commit(self, end):
        """
        This is invoked when the query has finished processing data before end offset. This can be used to clean up resources.
        """
        pass

Stap 3: De stream writer implementeren

Implementeer nu de streamingschrijver. Deze streaminggegevensschrijver schrijft de metagegevens van elke microbatch naar een lokaal pad.

class SimpleCommitMessage:
   def __init__(self, partition_id: int, count: int):
       self.partition_id = partition_id
       self.count = count

class FakeStreamWriter(DataSourceStreamWriter):
   def __init__(self, options):
       self.options = options
       self.path = self.options.get("path")
       assert self.path is not None

   def write(self, iterator):
       """
       Writes the data, then returns the commit message of that partition. Library imports must be within the method.
       """
       from pyspark import TaskContext
       context = TaskContext.get()
       partition_id = context.partitionId()
       cnt = 0
       for row in iterator:
           cnt += 1
       return SimpleCommitMessage(partition_id=partition_id, count=cnt)

   def commit(self, messages, batchId) -> None:
       """
       Receives a sequence of :class:`WriterCommitMessage` when all write tasks have succeeded, then decides what to do with it.
       In this FakeStreamWriter, the metadata of the microbatch(number of rows and partitions) is written into a JSON file inside commit().
       """
       status = dict(num_partitions=len(messages), rows=sum(m.count for m in messages))
       with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
           file.write(json.dumps(status) + "\n")

   def abort(self, messages, batchId) -> None:
       """
       Receives a sequence of :class:`WriterCommitMessage` from successful tasks when some other tasks have failed, then decides what to do with it.
       In this FakeStreamWriter, a failure message is written into a text file inside abort().
       """
       with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
           file.write(f"failed in batch {batchId}")

Stap 4: De voorbeeldgegevensbron registreren en gebruiken

Als u de gegevensbron wilt gebruiken, moet u deze registreren. Nadat het is geregistreerd, kunt u het gebruiken in streamingquery's als bron of sink door een korte naam of volledige naam door te geven aan format(). In het volgende voorbeeld wordt eerst de gegevensbron geregistreerd, waarna een query wordt gestart die leest uit de gegevensbron in het voorbeeld en uitvoer naar de console genereert.

spark.dataSource.register(FakeStreamDataSource)
query = spark.readStream.format("fakestream").load().writeStream.format("console").start()

U kunt ook in het volgende voorbeeld de voorbeeldstroom als sink gebruiken en een uitvoerpad opgeven:

query = spark.readStream.format("fakestream").load().writeStream.format("fake").start("/output_path")

Probleemoplossing

Als de uitvoer de volgende fout is, biedt uw berekening geen ondersteuning voor aangepaste PySpark-gegevensbronnen. U moet Databricks Runtime 15.2 of hoger gebruiken.

Error: [UNSUPPORTED_FEATURE.PYTHON_DATA_SOURCE] The feature is not supported: Python data sources. SQLSTATE: 0A000