Origini dati personalizzate PySpark
Importante
Le origini dati personalizzate PySpark sono disponibili in anteprima pubblica in Databricks Runtime 15.2 e versioni successive. Il supporto per lo streaming è disponibile in Databricks Runtime 15.3 e versioni successive.
Un'origine dati PySpark viene creata dall'API DataSource Python (PySpark), che consente la lettura da origini dati personalizzate e la scrittura in sink di dati personalizzati in Apache Spark usando Python. È possibile usare origini dati personalizzate PySpark per definire connessioni personalizzate ai sistemi dati e implementare funzionalità aggiuntive per creare origini dati riutilizzabili.
Classe DataSource
PySpark DataSource è una classe di base che fornisce metodi per creare lettori e writer di dati.
Implementare la sottoclasse dell'origine dati
A seconda del caso d'uso, è necessario implementare quanto segue da qualsiasi sottoclasse per rendere un'origine dati leggibile, scrivibile o entrambe:
Proprietà o metodo | Descrizione |
---|---|
name |
Obbligatorio. Nome dell'origine dati |
schema |
Obbligatorio. Schema dell'origine dati da leggere o scrivere |
reader() |
Deve restituire un oggetto DataSourceReader per rendere leggibile l'origine dati (batch) |
writer() |
Deve restituire un DataSourceWriter oggetto per rendere scrivibile il sink di dati (batch) |
streamReader() oppure simpleStreamReader() |
Deve restituire un DataSourceStreamReader oggetto per rendere il flusso di dati leggibile (streaming) |
streamWriter() |
Deve restituire un oggetto DataSourceStreamWriter per rendere scrivibile il flusso di dati (streaming) |
Nota
I metodi definiti dall'utente DataSource
, DataSourceStreamReader
DataSourceReader
DataSourceWriter
, , DataSourceStreamWriter
, e i relativi metodi devono essere serializzati. In altre parole, devono essere un dizionario o un dizionario annidato che contiene un tipo primitivo.
Registrare l'origine dati
Dopo aver implementato l'interfaccia, è necessario registrarla, quindi è possibile caricarla o usarla in altro modo, come illustrato nell'esempio seguente:
# Register the data source
spark.dataSource.register(MyDataSourceClass)
# Read from a custom data source
spark.read.format("my_datasource_name").load().show()
Esempio 1: Creare un'origine dati PySpark per una query batch
Per illustrare le funzionalità di lettura di PySpark DataSource, creare un'origine dati che genera dati di esempio usando il faker
pacchetto Python. Per altre informazioni su faker
, vedere la documentazione di Faker.
Installare il pacchetto faker
usando il comando seguente:
%pip install faker
Passaggio 1: Definire l'esempio datasource
Definire prima di tutto il nuovo DataSource PySpark come sottoclasse di DataSource
con un nome, uno schema e un lettore. Il reader()
metodo deve essere definito per leggere da un'origine dati in una query batch.
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType
class FakeDataSource(DataSource):
"""
An example data source for batch query using the `faker` library.
"""
@classmethod
def name(cls):
return "fake"
def schema(self):
return "name string, date string, zipcode string, state string"
def reader(self, schema: StructType):
return FakeDataSourceReader(schema, self.options)
Passaggio 2: Implementare il lettore per una query batch
Implementare quindi la logica del lettore per generare dati di esempio. Usare la libreria installata faker
per popolare ogni campo nello schema.
class FakeDataSourceReader(DataSourceReader):
def __init__(self, schema, options):
self.schema: StructType = schema
self.options = options
def read(self, partition):
# Library imports must be within the method.
from faker import Faker
fake = Faker()
# Every value in this `self.options` dictionary is a string.
num_rows = int(self.options.get("numRows", 3))
for _ in range(num_rows):
row = []
for field in self.schema.fields:
value = getattr(fake, field.name)()
row.append(value)
yield tuple(row)
Passaggio 3: Registrare e usare l'origine dati di esempio
Per usare l'origine dati, registrarla. Per impostazione predefinita, ha FakeDataSource
tre righe e lo schema include questi string
campi: name
, date
, zipcode
, state
. Nell'esempio seguente vengono registrati, caricati e restituiti l'origine dati di esempio con le impostazioni predefinite:
spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
+-----------------+----------+-------+----------+
| name| date|zipcode| state|
+-----------------+----------+-------+----------+
|Christine Sampson|1979-04-24| 79766| Colorado|
| Shelby Cox|2011-08-05| 24596| Florida|
| Amanda Robinson|2019-01-06| 57395|Washington|
+-----------------+----------+-------+----------+
Sono supportati solo string
i campi, ma è possibile specificare uno schema con tutti i campi corrispondenti ai faker
campi dei provider di pacchetti per generare dati casuali per il test e lo sviluppo. L'esempio seguente carica l'origine dati con name
i campi e company
:
spark.read.format("fake").schema("name string, company string").load().show()
+---------------------+--------------+
|name |company |
+---------------------+--------------+
|Tanner Brennan |Adams Group |
|Leslie Maxwell |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc |
+---------------------+--------------+
Per caricare l'origine dati con un numero personalizzato di righe, specificare l'opzione numRows
. Nell'esempio seguente vengono specificate 5 righe:
spark.read.format("fake").option("numRows", 5).load().show()
+--------------+----------+-------+------------+
| name| date|zipcode| state|
+--------------+----------+-------+------------+
| Pam Mitchell|1988-10-20| 23788| Tennessee|
|Melissa Turner|1996-06-14| 30851| Nevada|
| Brian Ramsey|2021-08-21| 55277| Washington|
| Caitlin Reed|1983-06-22| 89813|Pennsylvania|
| Douglas James|2007-01-18| 46226| Alabama|
+--------------+----------+-------+------------+
Esempio 2: Creare PySpark DataSource per lo streaming in lettura e scrittura
Per illustrare le funzionalità di lettura e scrittura del flusso PySpark DataSource, creare un'origine dati di esempio che genera due righe in ogni microbatch usando il faker
pacchetto Python. Per altre informazioni su faker
, vedere la documentazione di Faker.
Installare il pacchetto faker
usando il comando seguente:
%pip install faker
Passaggio 1: Definire l'esempio datasource
Prima di tutto, definire il nuovo Oggetto DataSource PySpark come sottoclasse di DataSource
con un nome, uno schema e i metodi streamReader()
e streamWriter()
.
from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter
from pyspark.sql.types import StructType
class FakeStreamDataSource(DataSource):
"""
An example data source for streaming read and write using the `faker` library.
"""
@classmethod
def name(cls):
return "fakestream"
def schema(self):
return "name string, state string"
def streamReader(self, schema: StructType):
return FakeStreamReader(schema, self.options)
# If you don't need partitioning, you can implement the simpleStreamReader method instead of streamReader.
# def simpleStreamReader(self, schema: StructType):
# return SimpleStreamReader()
def streamWriter(self, schema: StructType, overwrite: bool):
return FakeStreamWriter(self.options)
Passaggio 2: Implementare il lettore di flusso
Implementare quindi il lettore di dati di streaming di esempio che genera due righe in ogni microbatch. È possibile implementare DataSourceStreamReader
o se l'origine dati ha una velocità effettiva ridotta e non richiede il partizionamento, è invece possibile implementare SimpleDataSourceStreamReader
. È simpleStreamReader()
necessario implementare o streamReader()
e simpleStreamReader()
viene richiamato solo quando streamReader()
non viene implementato.
Implementazione di DataSourceStreamReader
L'istanza streamReader
ha un offset integer che aumenta di 2 in ogni microbatch, implementato con l'interfaccia DataSourceStreamReader
.
class RangePartition(InputPartition):
def __init__(self, start, end):
self.start = start
self.end = end
class FakeStreamReader(DataSourceStreamReader):
def __init__(self, schema, options):
self.current = 0
def initialOffset(self) -> dict:
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}
def latestOffset(self) -> dict:
"""
Returns the current latest offset that the next microbatch will read to.
"""
self.current += 2
return {"offset": self.current}
def partitions(self, start: dict, end: dict):
"""
Plans the partitioning of the current microbatch defined by start and end offset. It
needs to return a sequence of :class:`InputPartition` objects.
"""
return [RangePartition(start["offset"], end["offset"])]
def commit(self, end: dict):
"""
This is invoked when the query has finished processing data before end offset. This
can be used to clean up the resource.
"""
pass
def read(self, partition) -> Iterator[Tuple]:
"""
Takes a partition as an input and reads an iterator of tuples from the data source.
"""
start, end = partition.start, partition.end
for i in range(start, end):
yield (i, str(i))
Implementazione di SimpleDataSourceStreamReader
L'istanza SimpleStreamReader
è la stessa dell'istanza FakeStreamReader
che genera due righe in ogni batch, ma implementata con l'interfaccia SimpleDataSourceStreamReader
senza partizionamento.
class SimpleStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}
def read(self, start: dict) -> (Iterator[Tuple], dict):
"""
Takes start offset as an input, then returns an iterator of tuples and the start offset of the next read.
"""
start_idx = start["offset"]
it = iter([(i,) for i in range(start_idx, start_idx + 2)])
return (it, {"offset": start_idx + 2})
def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
"""
Takes start and end offset as inputs, then reads an iterator of data deterministically.
This is called when the query replays batches during restart or after a failure.
"""
start_idx = start["offset"]
end_idx = end["offset"]
return iter([(i,) for i in range(start_idx, end_idx)])
def commit(self, end):
"""
This is invoked when the query has finished processing data before end offset. This can be used to clean up resources.
"""
pass
Passaggio 3: Implementare il writer di flusso
Implementare ora il writer di streaming. Questo writer di dati di streaming scrive le informazioni sui metadati di ogni microbatch in un percorso locale.
class SimpleCommitMessage(WriterCommitMessage):
partition_id: int
count: int
class FakeStreamWriter(DataSourceStreamWriter):
def __init__(self, options):
self.options = options
self.path = self.options.get("path")
assert self.path is not None
def write(self, iterator):
"""
Writes the data, then returns the commit message of that partition. Library imports must be within the method.
"""
from pyspark import TaskContext
context = TaskContext.get()
partition_id = context.partitionId()
cnt = 0
for row in iterator:
cnt += 1
return SimpleCommitMessage(partition_id=partition_id, count=cnt)
def commit(self, messages, batchId) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` when all write tasks have succeeded, then decides what to do with it.
In this FakeStreamWriter, the metadata of the microbatch(number of rows and partitions) is written into a JSON file inside commit().
"""
status = dict(num_partitions=len(messages), rows=sum(m.count for m in messages))
with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
file.write(json.dumps(status) + "\n")
def abort(self, messages, batchId) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` from successful tasks when some other tasks have failed, then decides what to do with it.
In this FakeStreamWriter, a failure message is written into a text file inside abort().
"""
with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
file.write(f"failed in batch {batchId}")
Passaggio 4: Registrare e usare l'origine dati di esempio
Per usare l'origine dati, registrarla. Dopo che è stato regsiterato, è possibile usarlo nelle query di streaming come origine o sink passando un nome breve o un nome completo a format()
. Nell'esempio seguente viene registrata l'origine dati, quindi viene avviata una query che legge dall'origine dati di esempio e restituisce output nella console:
spark.dataSource.register(FakeStreamDataSource)
query = spark.readStream.format("fakestream").load().writeStream.format("console").start()
In alternativa, l'esempio seguente usa il flusso di esempio come sink e specifica un percorso di output:
query = spark.readStream.format("fakestream").load().writeStream.format("fake").start("/output_path")
Risoluzione dei problemi
Se l'output è l'errore seguente, il calcolo non supporta le origini dati personalizzate PySpark. È necessario usare Databricks Runtime 15.2 o versione successiva.
Error: [UNSUPPORTED_FEATURE.PYTHON_DATA_SOURCE] The feature is not supported: Python data sources. SQLSTATE: 0A000