Benutzerdefinierte PySpark-Datenquellen
Wichtig
Benutzerdefinierte PySpark-Datenquellen befinden sich in der öffentlichen Vorschau in Databricks Runtime 15.2 und höher. Streamingunterstützung ist in der Databricks Runtime 15.3 und höher verfügbar.
Eine PySpark DataSource-Datenquelle wird von der Python (PySpark) DataSource-API erstellt, die das Lesen von benutzerdefinierten Datenquellen und das Schreiben in benutzerdefinierte Datensenken in Apache Spark mithilfe von Python ermöglicht. Mithilfe von benutzerdefinierten PySpark-Datenquellen können Sie benutzerdefinierte Verbindungen mit Datensystemen definieren und zusätzliche Funktionalität implementieren, um wiederverwendbare Datenquellen zu erstellen.
DataSource-Klasse
PySpark DataSource ist eine Basisklasse, die Methoden zum Erstellen von Modulen zum Lesen und Schreiben von Daten bereitstellt.
Implementieren der Datenquellenunterklasse
Abhängig von Ihrem Anwendungsfall muss von jeder Unterklasse Folgendes implementiert werden, damit eine Datenquelle entweder lesbar, schreibbar oder beides ist:
Eigenschaft oder Methode | Beschreibung |
---|---|
name |
Erforderlich. Der Name der Datenquelle. |
schema |
Erforderlich. Das Schema der Datenquelle, die gelesen oder geschrieben werden soll |
reader() |
Muss einen DataSourceReader zurückzugeben, um die Datenquelle lesbar zu machen (Batch) |
writer() |
Muss einen DataSourceWriter zurückzugeben, um die Datensenke schreibbar zu machen (Batch) |
streamReader() oder simpleStreamReader() |
Muss einen DataSourceStreamReader zurückgeben, damit der Datenstrom lesbar ist (Streaming) |
streamWriter() |
Muss einen DataSourceStreamWriter zurückgeben, um den Datenstrom schreibbar zu machen (Streaming) |
Hinweis
Die benutzerdefinierten Werte DataSource
, DataSourceReader
, DataSourceWriter
, DataSourceStreamReader
, DataSourceStreamWriter
und ihre Methoden müssen serialisiert werden können. Mit anderen Worten: Sie müssen ein Wörterbuch oder ein geschachteltes Wörterbuch sein, das einen primitiven Typ enthält.
Registrieren der Datenquelle
Nach der Implementierung der Schnittstelle müssen Sie sie registrieren, und Sie können sie laden oder anderweitig verwenden, wie im folgenden Beispiel gezeigt:
# Register the data source
spark.dataSource.register(MyDataSourceClass)
# Read from a custom data source
spark.read.format("my_datasource_name").load().show()
Beispiel 1: Erstellen einer PySpark DataSource-Datenquelle für eine Batchabfrage
Um PySpark DataSource-Lesefunktionen zu veranschaulichen, erstellen Sie eine Datenquelle, die Beispieldaten mithilfe des Python-Pakets faker
generiert. Weitere Informationen zu faker
finden Sie in der Faker-Dokumentation.
Installieren Sie das Paket faker
mit dem folgenden Befehl:
%pip install faker
Schritt 1: Definieren des DataSource-Beispiels
Definieren Sie zunächst Ihre neue PySpark DataSource-Datenquelle als Unterklasse von DataSource
mit einem Namen, Schema und Lesemodul. Die reader()
-Methode muss definiert werden, um aus einer Datenquelle in einer Batchabfrage zu lesen.
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType
class FakeDataSource(DataSource):
"""
An example data source for batch query using the `faker` library.
"""
@classmethod
def name(cls):
return "fake"
def schema(self):
return "name string, date string, zipcode string, state string"
def reader(self, schema: StructType):
return FakeDataSourceReader(schema, self.options)
Schritt 2: Implementieren des Lesemoduls für eine Batchabfrage
Implementieren Sie als Nächstes die Leselogik, um Beispieldaten zu generieren. Verwenden Sie die installierte faker
-Bibliothek, um jedes Feld im Schema auszufüllen.
class FakeDataSourceReader(DataSourceReader):
def __init__(self, schema, options):
self.schema: StructType = schema
self.options = options
def read(self, partition):
# Library imports must be within the method.
from faker import Faker
fake = Faker()
# Every value in this `self.options` dictionary is a string.
num_rows = int(self.options.get("numRows", 3))
for _ in range(num_rows):
row = []
for field in self.schema.fields:
value = getattr(fake, field.name)()
row.append(value)
yield tuple(row)
Schritt 3: Registrieren und Verwenden der Beispieldatenquelle
Um die Datenquelle zu verwenden, registrieren Sie sie. Standardmäßig weist FakeDataSource
drei Zeilen auf, und das Schema enthält die folgenden string
-Felder: name
, date
, zipcode
, state
. Im folgenden Beispiel wird die Beispieldatenquelle mit den Standardwerten registriert, geladen und ausgegeben:
spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
+-----------------+----------+-------+----------+
| name| date|zipcode| state|
+-----------------+----------+-------+----------+
|Christine Sampson|1979-04-24| 79766| Colorado|
| Shelby Cox|2011-08-05| 24596| Florida|
| Amanda Robinson|2019-01-06| 57395|Washington|
+-----------------+----------+-------+----------+
Nur string
-Felder werden unterstützt, aber Sie können ein Schema mit allen Feldern angeben, die den Feldern des faker
-Paketanbieters entsprechen, um zufällige Daten für Tests und Entwicklung zu generieren. Im folgenden Beispiel wird die Datenquelle mit den Feldern name
und company
geladen:
spark.read.format("fake").schema("name string, company string").load().show()
+---------------------+--------------+
|name |company |
+---------------------+--------------+
|Tanner Brennan |Adams Group |
|Leslie Maxwell |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc |
+---------------------+--------------+
Wenn Sie die Datenquelle mit einer benutzerdefinierten Anzahl von Zeilen laden möchten, geben Sie die Option numRows
an. Im folgenden Beispiel werden 5 Zeilen angegeben:
spark.read.format("fake").option("numRows", 5).load().show()
+--------------+----------+-------+------------+
| name| date|zipcode| state|
+--------------+----------+-------+------------+
| Pam Mitchell|1988-10-20| 23788| Tennessee|
|Melissa Turner|1996-06-14| 30851| Nevada|
| Brian Ramsey|2021-08-21| 55277| Washington|
| Caitlin Reed|1983-06-22| 89813|Pennsylvania|
| Douglas James|2007-01-18| 46226| Alabama|
+--------------+----------+-------+------------+
Beispiel 2: Erstellen von PySpark DataSource zum Streamen von Lese- und Schreibzugriff
Um PySpark DataSource-Streamreader- und -writerfähigkeiten zu veranschaulichen, erstellen Sie eine Beispiel-Datenquelle, die zwei mithilfe des Python-Pakets faker
in jedem Mikrobatch zwei Reihen generiert. Weitere Informationen zu faker
finden Sie in der Faker-Dokumentation.
Installieren Sie das Paket faker
mit dem folgenden Befehl:
%pip install faker
Schritt 1: Definieren des DataSource-Beispiels
Definieren Sie als Nächstes Ihre neue PySpark DataSource-Datenquelle als Unterklasse von DataSource
mit einem Namen, Schema und die Methoden streamReader()
und streamWriter()
.
from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter
from pyspark.sql.types import StructType
class FakeStreamDataSource(DataSource):
"""
An example data source for streaming read and write using the `faker` library.
"""
@classmethod
def name(cls):
return "fakestream"
def schema(self):
return "name string, state string"
def streamReader(self, schema: StructType):
return FakeStreamReader(schema, self.options)
# If you don't need partitioning, you can implement the simpleStreamReader method instead of streamReader.
# def simpleStreamReader(self, schema: StructType):
# return SimpleStreamReader()
def streamWriter(self, schema: StructType, overwrite: bool):
return FakeStreamWriter(self.options)
Schritt 2: Implementieren des Streamreaders
Implementieren Sie als Nächstes den Beispiel-Streamingdatenleser, der zwei Zeilen in jedem Mikrobatch generiert. Sie können DataSourceStreamReader
implementieren oder wenn die Datenquelle einen niedrigen Durchsatz aufweist und keine Partitionierung erfordert, können Sie stattdessen SimpleDataSourceStreamReader
implementieren. Es muss entweder simpleStreamReader()
oder streamReader()
implementiert werden und simpleStreamReader()
wird nur aufgerufen, wenn streamReader()
nicht implementiert ist.
Implementierung des DataSourceStreamReader
Die streamReader
-Instanz weist einen ganzzahligen Offset auf, der bei jedem Mikrobatch, der mit der DataSourceStreamReader
-Schnittstelle implementiert wird, um 2 erhöht wird.
from pyspark.sql.datasource import InputPartition
from typing import Iterator, Tuple
import os
import json
class RangePartition(InputPartition):
def __init__(self, start, end):
self.start = start
self.end = end
class FakeStreamReader(DataSourceStreamReader):
def __init__(self, schema, options):
self.current = 0
def initialOffset(self) -> dict:
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}
def latestOffset(self) -> dict:
"""
Returns the current latest offset that the next microbatch will read to.
"""
self.current += 2
return {"offset": self.current}
def partitions(self, start: dict, end: dict):
"""
Plans the partitioning of the current microbatch defined by start and end offset. It
needs to return a sequence of :class:`InputPartition` objects.
"""
return [RangePartition(start["offset"], end["offset"])]
def commit(self, end: dict):
"""
This is invoked when the query has finished processing data before end offset. This
can be used to clean up the resource.
"""
pass
def read(self, partition) -> Iterator[Tuple]:
"""
Takes a partition as an input and reads an iterator of tuples from the data source.
"""
start, end = partition.start, partition.end
for i in range(start, end):
yield (i, str(i))
Implementierung des SimpleDataSourceStreamReader
Die SimpleStreamReader
-Instanz ist identisch mit der FakeStreamReader
-Instanz, die in jedem Batch zwei Zeilen generiert, aber ohne Partitionierung mit der SimpleDataSourceStreamReader
-Schnittstelle implementiert wird.
class SimpleStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}
def read(self, start: dict) -> (Iterator[Tuple], dict):
"""
Takes start offset as an input, then returns an iterator of tuples and the start offset of the next read.
"""
start_idx = start["offset"]
it = iter([(i,) for i in range(start_idx, start_idx + 2)])
return (it, {"offset": start_idx + 2})
def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
"""
Takes start and end offset as inputs, then reads an iterator of data deterministically.
This is called when the query replays batches during restart or after a failure.
"""
start_idx = start["offset"]
end_idx = end["offset"]
return iter([(i,) for i in range(start_idx, end_idx)])
def commit(self, end):
"""
This is invoked when the query has finished processing data before end offset. This can be used to clean up resources.
"""
pass
Schritt 3: Implementieren des Streamwriters
Implementieren Sie nun den Streamingwriter. Dieser Streamingdaten-Writer schreibt die Metadateninformationen der einzelnen Mikrobatches in einen lokalen Pfad.
class SimpleCommitMessage:
def __init__(self, partition_id: int, count: int):
self.partition_id = partition_id
self.count = count
class FakeStreamWriter(DataSourceStreamWriter):
def __init__(self, options):
self.options = options
self.path = self.options.get("path")
assert self.path is not None
def write(self, iterator):
"""
Writes the data, then returns the commit message of that partition. Library imports must be within the method.
"""
from pyspark import TaskContext
context = TaskContext.get()
partition_id = context.partitionId()
cnt = 0
for row in iterator:
cnt += 1
return SimpleCommitMessage(partition_id=partition_id, count=cnt)
def commit(self, messages, batchId) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` when all write tasks have succeeded, then decides what to do with it.
In this FakeStreamWriter, the metadata of the microbatch(number of rows and partitions) is written into a JSON file inside commit().
"""
status = dict(num_partitions=len(messages), rows=sum(m.count for m in messages))
with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
file.write(json.dumps(status) + "\n")
def abort(self, messages, batchId) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` from successful tasks when some other tasks have failed, then decides what to do with it.
In this FakeStreamWriter, a failure message is written into a text file inside abort().
"""
with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
file.write(f"failed in batch {batchId}")
Schritt 4: Registrieren und Verwenden der Beispieldatenquelle
Um die Datenquelle zu verwenden, registrieren Sie sie. Nachdem er registriert wurde, können Sie ihn in Streamingabfragen als Quelle oder Senke verwenden, indem Sie einen Kurz- oder vollständigen Namen an format()
übergeben. Das folgende Beispiel registriert die Datenquelle und startet dann eine Abfrage, die aus der Beispieldatenquelle liest und auf der Konsole ausgibt:
spark.dataSource.register(FakeStreamDataSource)
query = spark.readStream.format("fakestream").load().writeStream.format("console").start()
Alternativ dazu verwendet das folgende Beispiel den Beispielstream als Senke und gibt einen Ausgabepfad an:
query = spark.readStream.format("fakestream").load().writeStream.format("fake").start("/output_path")
Problembehandlung
Wenn der folgende Fehler ausgegeben wird, unterstützen Ihre Computeressourcen keine benutzerdefinierten PySpark-Datenquellen. Sie müssen Databricks Runtime 15.2 oder höher verwenden.
Error: [UNSUPPORTED_FEATURE.PYTHON_DATA_SOURCE] The feature is not supported: Python data sources. SQLSTATE: 0A000