Condividi tramite


Origini dati personalizzate PySpark

Importante

Le origini dati personalizzate PySpark sono disponibili in anteprima pubblica in Databricks Runtime 15.2 e versioni successive. Il supporto per lo streaming è disponibile in Databricks Runtime 15.3 e versioni successive.

Un'origine dati PySpark viene creata dall'API DataSource Python (PySpark), che consente la lettura da origini dati personalizzate e la scrittura in sink di dati personalizzati in Apache Spark usando Python. È possibile usare origini dati personalizzate PySpark per definire connessioni personalizzate ai sistemi dati e implementare funzionalità aggiuntive per creare origini dati riutilizzabili.

Classe DataSource

PySpark DataSource è una classe di base che fornisce metodi per creare lettori e writer di dati.

Implementare la sottoclasse dell'origine dati

A seconda del caso d'uso, è necessario implementare quanto segue da qualsiasi sottoclasse per rendere un'origine dati leggibile, scrivibile o entrambe:

Proprietà o metodo Descrizione
name Obbligatorio. Nome dell'origine dati
schema Obbligatorio. Schema dell'origine dati da leggere o scrivere
reader() Deve restituire un oggetto DataSourceReader per rendere leggibile l'origine dati (batch)
writer() Deve restituire un DataSourceWriter oggetto per rendere scrivibile il sink di dati (batch)
streamReader() oppure simpleStreamReader() Deve restituire un DataSourceStreamReader oggetto per rendere il flusso di dati leggibile (streaming)
streamWriter() Deve restituire un oggetto DataSourceStreamWriter per rendere scrivibile il flusso di dati (streaming)

Nota

I metodi definiti dall'utente DataSource, DataSourceStreamReaderDataSourceReaderDataSourceWriter, , DataSourceStreamWriter, e i relativi metodi devono essere serializzati. In altre parole, devono essere un dizionario o un dizionario annidato che contiene un tipo primitivo.

Registrare l'origine dati

Dopo aver implementato l'interfaccia, è necessario registrarla, quindi è possibile caricarla o usarla in altro modo, come illustrato nell'esempio seguente:

# Register the data source
spark.dataSource.register(MyDataSourceClass)

# Read from a custom data source
spark.read.format("my_datasource_name").load().show()

Esempio 1: Creare un'origine dati PySpark per una query batch

Per illustrare le funzionalità di lettura di PySpark DataSource, creare un'origine dati che genera dati di esempio usando il faker pacchetto Python. Per altre informazioni su faker, vedere la documentazione di Faker.

Installare il pacchetto faker usando il comando seguente:

%pip install faker

Passaggio 1: Definire l'esempio datasource

Definire prima di tutto il nuovo DataSource PySpark come sottoclasse di DataSource con un nome, uno schema e un lettore. Il reader() metodo deve essere definito per leggere da un'origine dati in una query batch.

from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType

class FakeDataSource(DataSource):
    """
    An example data source for batch query using the `faker` library.
    """

    @classmethod
    def name(cls):
        return "fake"

    def schema(self):
        return "name string, date string, zipcode string, state string"

    def reader(self, schema: StructType):
        return FakeDataSourceReader(schema, self.options)

Passaggio 2: Implementare il lettore per una query batch

Implementare quindi la logica del lettore per generare dati di esempio. Usare la libreria installata faker per popolare ogni campo nello schema.

class FakeDataSourceReader(DataSourceReader):

    def __init__(self, schema, options):
        self.schema: StructType = schema
        self.options = options

    def read(self, partition):
        # Library imports must be within the method.
        from faker import Faker
        fake = Faker()

        # Every value in this `self.options` dictionary is a string.
        num_rows = int(self.options.get("numRows", 3))
        for _ in range(num_rows):
            row = []
            for field in self.schema.fields:
                value = getattr(fake, field.name)()
                row.append(value)
            yield tuple(row)

Passaggio 3: Registrare e usare l'origine dati di esempio

Per usare l'origine dati, registrarla. Per impostazione predefinita, ha FakeDataSource tre righe e lo schema include questi string campi: name, date, zipcode, state. Nell'esempio seguente vengono registrati, caricati e restituiti l'origine dati di esempio con le impostazioni predefinite:

spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
+-----------------+----------+-------+----------+
|             name|      date|zipcode|     state|
+-----------------+----------+-------+----------+
|Christine Sampson|1979-04-24|  79766|  Colorado|
|       Shelby Cox|2011-08-05|  24596|   Florida|
|  Amanda Robinson|2019-01-06|  57395|Washington|
+-----------------+----------+-------+----------+

Sono supportati solo string i campi, ma è possibile specificare uno schema con tutti i campi corrispondenti ai faker campi dei provider di pacchetti per generare dati casuali per il test e lo sviluppo. L'esempio seguente carica l'origine dati con name i campi e company :

spark.read.format("fake").schema("name string, company string").load().show()
+---------------------+--------------+
|name                 |company       |
+---------------------+--------------+
|Tanner Brennan       |Adams Group   |
|Leslie Maxwell       |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc   |
+---------------------+--------------+

Per caricare l'origine dati con un numero personalizzato di righe, specificare l'opzione numRows . Nell'esempio seguente vengono specificate 5 righe:

spark.read.format("fake").option("numRows", 5).load().show()
+--------------+----------+-------+------------+
|          name|      date|zipcode|       state|
+--------------+----------+-------+------------+
|  Pam Mitchell|1988-10-20|  23788|   Tennessee|
|Melissa Turner|1996-06-14|  30851|      Nevada|
|  Brian Ramsey|2021-08-21|  55277|  Washington|
|  Caitlin Reed|1983-06-22|  89813|Pennsylvania|
| Douglas James|2007-01-18|  46226|     Alabama|
+--------------+----------+-------+------------+

Esempio 2: Creare PySpark DataSource per lo streaming in lettura e scrittura

Per illustrare le funzionalità di lettura e scrittura del flusso PySpark DataSource, creare un'origine dati di esempio che genera due righe in ogni microbatch usando il faker pacchetto Python. Per altre informazioni su faker, vedere la documentazione di Faker.

Installare il pacchetto faker usando il comando seguente:

%pip install faker

Passaggio 1: Definire l'esempio datasource

Prima di tutto, definire il nuovo Oggetto DataSource PySpark come sottoclasse di DataSource con un nome, uno schema e i metodi streamReader() e streamWriter().

from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter
from pyspark.sql.types import StructType

class FakeStreamDataSource(DataSource):
    """
    An example data source for streaming read and write using the `faker` library.
    """

    @classmethod
    def name(cls):
        return "fakestream"

    def schema(self):
        return "name string, state string"

    def streamReader(self, schema: StructType):
        return FakeStreamReader(schema, self.options)

    # If you don't need partitioning, you can implement the simpleStreamReader method instead of streamReader.
    # def simpleStreamReader(self, schema: StructType):
    #    return SimpleStreamReader()

    def streamWriter(self, schema: StructType, overwrite: bool):
        return FakeStreamWriter(self.options)

Passaggio 2: Implementare il lettore di flusso

Implementare quindi il lettore di dati di streaming di esempio che genera due righe in ogni microbatch. È possibile implementare DataSourceStreamReadero se l'origine dati ha una velocità effettiva ridotta e non richiede il partizionamento, è invece possibile implementare SimpleDataSourceStreamReader . È simpleStreamReader() necessario implementare o streamReader() e simpleStreamReader() viene richiamato solo quando streamReader() non viene implementato.

Implementazione di DataSourceStreamReader

L'istanza streamReader ha un offset integer che aumenta di 2 in ogni microbatch, implementato con l'interfaccia DataSourceStreamReader .

class RangePartition(InputPartition):
    def __init__(self, start, end):
        self.start = start
        self.end = end

class FakeStreamReader(DataSourceStreamReader):
    def __init__(self, schema, options):
        self.current = 0

    def initialOffset(self) -> dict:
        """
        Returns the initial start offset of the reader.
        """
        return {"offset": 0}

    def latestOffset(self) -> dict:
        """
        Returns the current latest offset that the next microbatch will read to.
        """
        self.current += 2
        return {"offset": self.current}

    def partitions(self, start: dict, end: dict):
        """
        Plans the partitioning of the current microbatch defined by start and end offset. It
        needs to return a sequence of :class:`InputPartition` objects.
        """
        return [RangePartition(start["offset"], end["offset"])]

    def commit(self, end: dict):
        """
        This is invoked when the query has finished processing data before end offset. This
        can be used to clean up the resource.
        """
        pass

    def read(self, partition) -> Iterator[Tuple]:
        """
        Takes a partition as an input and reads an iterator of tuples from the data source.
        """
        start, end = partition.start, partition.end
        for i in range(start, end):
            yield (i, str(i))

Implementazione di SimpleDataSourceStreamReader

L'istanza SimpleStreamReader è la stessa dell'istanza FakeStreamReader che genera due righe in ogni batch, ma implementata con l'interfaccia SimpleDataSourceStreamReader senza partizionamento.

class SimpleStreamReader(SimpleDataSourceStreamReader):
    def initialOffset(self):
        """
        Returns the initial start offset of the reader.
        """
        return {"offset": 0}

    def read(self, start: dict) -> (Iterator[Tuple], dict):
        """
        Takes start offset as an input, then returns an iterator of tuples and the start offset of the next read.
        """
        start_idx = start["offset"]
        it = iter([(i,) for i in range(start_idx, start_idx + 2)])
        return (it, {"offset": start_idx + 2})

    def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
        """
        Takes start and end offset as inputs, then reads an iterator of data deterministically.
        This is called when the query replays batches during restart or after a failure.
        """
        start_idx = start["offset"]
        end_idx = end["offset"]
        return iter([(i,) for i in range(start_idx, end_idx)])

    def commit(self, end):
        """
        This is invoked when the query has finished processing data before end offset. This can be used to clean up resources.
        """
        pass

Passaggio 3: Implementare il writer di flusso

Implementare ora il writer di streaming. Questo writer di dati di streaming scrive le informazioni sui metadati di ogni microbatch in un percorso locale.

class SimpleCommitMessage(WriterCommitMessage):
   partition_id: int
   count: int

class FakeStreamWriter(DataSourceStreamWriter):
   def __init__(self, options):
       self.options = options
       self.path = self.options.get("path")
       assert self.path is not None

   def write(self, iterator):
       """
       Writes the data, then returns the commit message of that partition. Library imports must be within the method.
       """
       from pyspark import TaskContext
       context = TaskContext.get()
       partition_id = context.partitionId()
       cnt = 0
       for row in iterator:
           cnt += 1
       return SimpleCommitMessage(partition_id=partition_id, count=cnt)

   def commit(self, messages, batchId) -> None:
       """
       Receives a sequence of :class:`WriterCommitMessage` when all write tasks have succeeded, then decides what to do with it.
       In this FakeStreamWriter, the metadata of the microbatch(number of rows and partitions) is written into a JSON file inside commit().
       """
       status = dict(num_partitions=len(messages), rows=sum(m.count for m in messages))
       with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
           file.write(json.dumps(status) + "\n")

   def abort(self, messages, batchId) -> None:
       """
       Receives a sequence of :class:`WriterCommitMessage` from successful tasks when some other tasks have failed, then decides what to do with it.
       In this FakeStreamWriter, a failure message is written into a text file inside abort().
       """
       with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
           file.write(f"failed in batch {batchId}")

Passaggio 4: Registrare e usare l'origine dati di esempio

Per usare l'origine dati, registrarla. Dopo che è stato regsiterato, è possibile usarlo nelle query di streaming come origine o sink passando un nome breve o un nome completo a format(). Nell'esempio seguente viene registrata l'origine dati, quindi viene avviata una query che legge dall'origine dati di esempio e restituisce output nella console:

spark.dataSource.register(FakeStreamDataSource)
query = spark.readStream.format("fakestream").load().writeStream.format("console").start()

In alternativa, l'esempio seguente usa il flusso di esempio come sink e specifica un percorso di output:

query = spark.readStream.format("fakestream").load().writeStream.format("fake").start("/output_path")

Risoluzione dei problemi

Se l'output è l'errore seguente, il calcolo non supporta le origini dati personalizzate PySpark. È necessario usare Databricks Runtime 15.2 o versione successiva.

Error: [UNSUPPORTED_FEATURE.PYTHON_DATA_SOURCE] The feature is not supported: Python data sources. SQLSTATE: 0A000