Dela via


Anpassade datakällor i PySpark

Viktigt!

Anpassade PySpark-datakällor finns i offentlig förhandsversion i Databricks Runtime 15.2 och senare. Direktuppspelningsstöd är tillgängligt i Databricks Runtime 15.3 och senare.

En PySpark DataSource skapas av Python (PySpark) DataSource-API:et, som gör det möjligt att läsa från anpassade datakällor och skriva till anpassade datamottagare i Apache Spark med python. Du kan använda anpassade PySpark-datakällor för att definiera anpassade anslutningar till datasystem och implementera ytterligare funktioner för att skapa återanvändbara datakällor.

DataSource-klass

PySpark DataSource är en basklass som tillhandahåller metoder för att skapa dataläsare och författare.

Implementera underklassen datakälla

Beroende på ditt användningsfall måste följande implementeras av alla underklasser för att göra en datakälla antingen läsbar, skrivbar eller både och:

Egenskap eller metod beskrivning
name Obligatoriskt. Namnet på datakällan
schema Obligatoriskt. Schemat för datakällan som ska läsas eller skrivas
reader() Måste returnera en DataSourceReader för att göra datakällan läsbar (batch)
writer() Måste returnera en DataSourceWriter för att göra datamottagaren skrivbar (batch)
streamReader() eller simpleStreamReader() Måste returnera en DataSourceStreamReader för att göra dataströmmen läsbar (direktuppspelning)
streamWriter() Måste returnera en DataSourceStreamWriter för att göra dataströmmen skrivbar (direktuppspelning)

Kommentar

De användardefinierade DataSourcemetoderna , DataSourceReader, DataSourceWriter, DataSourceStreamReader, DataSourceStreamWriteroch deras metoder måste kunna serialiseras. Med andra ord måste de vara en ordlista eller kapslad ordlista som innehåller en primitiv typ.

Registrera datakällan

När du har implementerat gränssnittet måste du registrera det, sedan kan du läsa in eller på annat sätt använda det som visas i följande exempel:

# Register the data source
spark.dataSource.register(MyDataSourceClass)

# Read from a custom data source
spark.read.format("my_datasource_name").load().show()

Exempel 1: Skapa en PySpark DataSource för batchfråga

Om du vill demonstrera PySpark DataSource-läsarfunktioner skapar du en datakälla som genererar exempeldata med hjälp av Python-paketet faker . Mer information om fakerfinns i Faker-dokumentationen.

faker Installera paketet med följande kommando:

%pip install faker

Steg 1: Definiera exemplet DataSource

Definiera först din nya PySpark DataSource som en underklass med DataSource namn, schema och läsare. Metoden reader() måste definieras för att läsa från en datakälla i en batchfråga.

from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType

class FakeDataSource(DataSource):
    """
    An example data source for batch query using the `faker` library.
    """

    @classmethod
    def name(cls):
        return "fake"

    def schema(self):
        return "name string, date string, zipcode string, state string"

    def reader(self, schema: StructType):
        return FakeDataSourceReader(schema, self.options)

Steg 2: Implementera läsaren för en batchfråga

Implementera sedan läsarlogik för att generera exempeldata. Använd det installerade faker biblioteket för att fylla i varje fält i schemat.

class FakeDataSourceReader(DataSourceReader):

    def __init__(self, schema, options):
        self.schema: StructType = schema
        self.options = options

    def read(self, partition):
        # Library imports must be within the method.
        from faker import Faker
        fake = Faker()

        # Every value in this `self.options` dictionary is a string.
        num_rows = int(self.options.get("numRows", 3))
        for _ in range(num_rows):
            row = []
            for field in self.schema.fields:
                value = getattr(fake, field.name)()
                row.append(value)
            yield tuple(row)

Steg 3: Registrera och använda exempeldatakällan

Om du vill använda datakällan registrerar du den. Som standard har de FakeDataSource tre raderna och schemat innehåller följande string fält: name, date, zipcode, . state Följande exempel registrerar, läser in och matar ut exempeldatakällan med standardvärdena:

spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
+-----------------+----------+-------+----------+
|             name|      date|zipcode|     state|
+-----------------+----------+-------+----------+
|Christine Sampson|1979-04-24|  79766|  Colorado|
|       Shelby Cox|2011-08-05|  24596|   Florida|
|  Amanda Robinson|2019-01-06|  57395|Washington|
+-----------------+----------+-------+----------+

Endast string fält stöds, men du kan ange ett schema med alla fält som motsvarar faker paketleverantörernas fält för att generera slumpmässiga data för testning och utveckling. I följande exempel läses datakällan in med name och company fält:

spark.read.format("fake").schema("name string, company string").load().show()
+---------------------+--------------+
|name                 |company       |
+---------------------+--------------+
|Tanner Brennan       |Adams Group   |
|Leslie Maxwell       |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc   |
+---------------------+--------------+

Om du vill läsa in datakällan med ett anpassat antal rader anger du alternativet numRows . I följande exempel anges 5 rader:

spark.read.format("fake").option("numRows", 5).load().show()
+--------------+----------+-------+------------+
|          name|      date|zipcode|       state|
+--------------+----------+-------+------------+
|  Pam Mitchell|1988-10-20|  23788|   Tennessee|
|Melissa Turner|1996-06-14|  30851|      Nevada|
|  Brian Ramsey|2021-08-21|  55277|  Washington|
|  Caitlin Reed|1983-06-22|  89813|Pennsylvania|
| Douglas James|2007-01-18|  46226|     Alabama|
+--------------+----------+-------+------------+

Exempel 2: Skapa PySpark DataSource för direktuppspelning av läsning och skrivning

Skapa en exempeldatakälla som genererar två rader i varje mikrobatch med hjälp av faker Python-paketet för att demonstrera strömläsare och skrivarfunktioner i PySpark DataSource. Mer information om fakerfinns i Faker-dokumentationen.

faker Installera paketet med följande kommando:

%pip install faker

Steg 1: Definiera exemplet DataSource

Definiera först din nya PySpark DataSource som en underklass av DataSource med ett namn, schema och metoder streamReader() och streamWriter().

from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter
from pyspark.sql.types import StructType

class FakeStreamDataSource(DataSource):
    """
    An example data source for streaming read and write using the `faker` library.
    """

    @classmethod
    def name(cls):
        return "fakestream"

    def schema(self):
        return "name string, state string"

    def streamReader(self, schema: StructType):
        return FakeStreamReader(schema, self.options)

    # If you don't need partitioning, you can implement the simpleStreamReader method instead of streamReader.
    # def simpleStreamReader(self, schema: StructType):
    #    return SimpleStreamReader()

    def streamWriter(self, schema: StructType, overwrite: bool):
        return FakeStreamWriter(self.options)

Steg 2: Implementera strömläsaren

Implementera sedan exempelläsaren för strömmande data som genererar två rader i varje mikrobatch. Du kan implementera DataSourceStreamReader, eller om datakällan har lågt dataflöde och inte kräver partitionering, kan du implementera SimpleDataSourceStreamReader i stället. Antingen simpleStreamReader() eller streamReader() måste implementeras och simpleStreamReader() anropas endast när streamReader() inte har implementerats.

Implementering av DataSourceStreamReader

Instansen streamReader har en heltalsförskjutning som ökar med 2 i varje mikrobatch, implementerad med DataSourceStreamReader gränssnittet.

class RangePartition(InputPartition):
    def __init__(self, start, end):
        self.start = start
        self.end = end

class FakeStreamReader(DataSourceStreamReader):
    def __init__(self, schema, options):
        self.current = 0

    def initialOffset(self) -> dict:
        """
        Returns the initial start offset of the reader.
        """
        return {"offset": 0}

    def latestOffset(self) -> dict:
        """
        Returns the current latest offset that the next microbatch will read to.
        """
        self.current += 2
        return {"offset": self.current}

    def partitions(self, start: dict, end: dict):
        """
        Plans the partitioning of the current microbatch defined by start and end offset. It
        needs to return a sequence of :class:`InputPartition` objects.
        """
        return [RangePartition(start["offset"], end["offset"])]

    def commit(self, end: dict):
        """
        This is invoked when the query has finished processing data before end offset. This
        can be used to clean up the resource.
        """
        pass

    def read(self, partition) -> Iterator[Tuple]:
        """
        Takes a partition as an input and reads an iterator of tuples from the data source.
        """
        start, end = partition.start, partition.end
        for i in range(start, end):
            yield (i, str(i))

Implementering av SimpleDataSourceStreamReader

Instansen SimpleStreamReader är samma som den FakeStreamReader instans som genererar två rader i varje batch, men implementeras med SimpleDataSourceStreamReader gränssnittet utan partitionering.

class SimpleStreamReader(SimpleDataSourceStreamReader):
    def initialOffset(self):
        """
        Returns the initial start offset of the reader.
        """
        return {"offset": 0}

    def read(self, start: dict) -> (Iterator[Tuple], dict):
        """
        Takes start offset as an input, then returns an iterator of tuples and the start offset of the next read.
        """
        start_idx = start["offset"]
        it = iter([(i,) for i in range(start_idx, start_idx + 2)])
        return (it, {"offset": start_idx + 2})

    def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
        """
        Takes start and end offset as inputs, then reads an iterator of data deterministically.
        This is called when the query replays batches during restart or after a failure.
        """
        start_idx = start["offset"]
        end_idx = end["offset"]
        return iter([(i,) for i in range(start_idx, end_idx)])

    def commit(self, end):
        """
        This is invoked when the query has finished processing data before end offset. This can be used to clean up resources.
        """
        pass

Steg 3: Implementera strömskrivaren

Implementera nu strömningsskrivaren. Den här strömmande dataskrivaren skriver metadatainformationen för varje mikrobatch till en lokal sökväg.

class SimpleCommitMessage(WriterCommitMessage):
   partition_id: int
   count: int

class FakeStreamWriter(DataSourceStreamWriter):
   def __init__(self, options):
       self.options = options
       self.path = self.options.get("path")
       assert self.path is not None

   def write(self, iterator):
       """
       Writes the data, then returns the commit message of that partition. Library imports must be within the method.
       """
       from pyspark import TaskContext
       context = TaskContext.get()
       partition_id = context.partitionId()
       cnt = 0
       for row in iterator:
           cnt += 1
       return SimpleCommitMessage(partition_id=partition_id, count=cnt)

   def commit(self, messages, batchId) -> None:
       """
       Receives a sequence of :class:`WriterCommitMessage` when all write tasks have succeeded, then decides what to do with it.
       In this FakeStreamWriter, the metadata of the microbatch(number of rows and partitions) is written into a JSON file inside commit().
       """
       status = dict(num_partitions=len(messages), rows=sum(m.count for m in messages))
       with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
           file.write(json.dumps(status) + "\n")

   def abort(self, messages, batchId) -> None:
       """
       Receives a sequence of :class:`WriterCommitMessage` from successful tasks when some other tasks have failed, then decides what to do with it.
       In this FakeStreamWriter, a failure message is written into a text file inside abort().
       """
       with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
           file.write(f"failed in batch {batchId}")

Steg 4: Registrera och använda exempeldatakällan

Om du vill använda datakällan registrerar du den. När den är regsitered kan du använda den i strömmande frågor som källa eller mottagare genom att skicka ett kort namn eller fullständigt namn till format(). I följande exempel registreras datakällan och sedan startas en fråga som läser från exempeldatakällan och utdata till konsolen:

spark.dataSource.register(FakeStreamDataSource)
query = spark.readStream.format("fakestream").load().writeStream.format("console").start()

I följande exempel används också exempelströmmen som en mottagare och anger en utdatasökväg:

query = spark.readStream.format("fakestream").load().writeStream.format("fake").start("/output_path")

Felsökning

Om utdata är följande fel stöder din beräkning inte anpassade PySpark-datakällor. Du måste använda Databricks Runtime 15.2 eller senare.

Error: [UNSUPPORTED_FEATURE.PYTHON_DATA_SOURCE] The feature is not supported: Python data sources. SQLSTATE: 0A000