Aangepaste gegevensbronnen van PySpark
Belangrijk
Aangepaste PySpark-gegevensbronnen bevinden zich in Public Preview in Databricks Runtime 15.2 en hoger, en in versie 2 van de serverloze omgeving. Streaming-ondersteuning is beschikbaar in Databricks Runtime 15.3 en hoger.
Een PySpark DataSource wordt gemaakt door de Python DataSource-API (PySpark), waarmee u kunt lezen uit aangepaste gegevensbronnen en schrijven naar aangepaste gegevenssinks in Apache Spark met behulp van Python. U kunt aangepaste gegevensbronnen van PySpark gebruiken om aangepaste verbindingen met gegevenssystemen te definiëren en extra functionaliteit te implementeren om herbruikbare gegevensbronnen uit te bouwen.
Gegevensbronklasse
PySpark DataSource is een basisklasse die methoden biedt voor het maken van gegevenslezers en schrijvers.
De subklasse van de gegevensbron implementeren
Afhankelijk van uw gebruiksscenario moet het volgende worden geïmplementeerd door een subklasse om een gegevensbron leesbaar, beschrijfbaar of beide te maken:
Eigenschap of methode | Beschrijving |
---|---|
name |
Vereist. De naam van de gegevensbron |
schema |
Vereist. Het schema van de gegevensbron die moet worden gelezen of geschreven |
reader() |
Moet een DataSourceReader retourneren om de gegevensbron leesbaar te maken (batch) |
writer() |
Om de gegevenssink schrijfbaar te maken, moet een DataSourceWriter worden geretourneerd (batch). |
streamReader() of simpleStreamReader() |
Moet een DataSourceStreamReader retourneren om de gegevensstroom leesbaar te maken (streaming) |
streamWriter() |
Moet een DataSourceStreamWriter retourneren om de gegevensstroom beschrijfbaar te maken (streaming) |
Notitie
De door de gebruiker gedefinieerde DataSource
, DataSourceReader
, DataSourceWriter
, , DataSourceStreamReader
en DataSourceStreamWriter
hun methoden moeten kunnen worden geserialiseerd. Met andere woorden, ze moeten een woordenboek of een genest woordenboek zijn die een primitief type bevatten.
De gegevensbron registreren
Nadat u de interface hebt geïmplementeerd, moet u deze registreren en kunt u deze laden of anderszins gebruiken, zoals wordt weergegeven in het volgende voorbeeld:
# Register the data source
spark.dataSource.register(MyDataSourceClass)
# Read from a custom data source
spark.read.format("my_datasource_name").load().show()
Voorbeeld 1: Een PySpark-gegevensbron maken voor batchquery
Als u de mogelijkheden van PySpark DataSource-lezer wilt demonstreren, maakt u een gegevensbron waarmee voorbeeldgegevens worden gegenereerd met behulp van het faker
Python-pakket. Raadpleeg de Faker-documentatie voor meer informatie over faker
.
Installeer het faker
pakket met behulp van de volgende opdracht:
%pip install faker
Stap 1: De voorbeeldgegevensbron definiëren
Definieer eerst uw nieuwe PySpark DataSource als een subklasse van DataSource
met een naam, schema en lezer. De reader()
methode moet worden gedefinieerd om te lezen uit een gegevensbron in een batchquery.
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType
class FakeDataSource(DataSource):
"""
An example data source for batch query using the `faker` library.
"""
@classmethod
def name(cls):
return "fake"
def schema(self):
return "name string, date string, zipcode string, state string"
def reader(self, schema: StructType):
return FakeDataSourceReader(schema, self.options)
Stap 2: De lezer voor een batchquery implementeren
Implementeer vervolgens de lezerlogica om voorbeeldgegevens te genereren. Gebruik de geïnstalleerde faker
-bibliotheek om elk veld in het schema te vullen.
class FakeDataSourceReader(DataSourceReader):
def __init__(self, schema, options):
self.schema: StructType = schema
self.options = options
def read(self, partition):
# Library imports must be within the method.
from faker import Faker
fake = Faker()
# Every value in this `self.options` dictionary is a string.
num_rows = int(self.options.get("numRows", 3))
for _ in range(num_rows):
row = []
for field in self.schema.fields:
value = getattr(fake, field.name)()
row.append(value)
yield tuple(row)
Stap 3: De voorbeeldgegevensbron registreren en gebruiken
Als u de gegevensbron wilt gebruiken, moet u deze registreren. De FakeDataSource
heeft standaard drie rijen en het schema bevat de volgende string
velden: name
, date
, zipcode
, state
. In het volgende voorbeeld wordt de voorbeeldgegevensbron geregistreerd, geladen en uitgevoerd met de standaardinstellingen:
spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
+-----------------+----------+-------+----------+
| name| date|zipcode| state|
+-----------------+----------+-------+----------+
|Christine Sampson|1979-04-24| 79766| Colorado|
| Shelby Cox|2011-08-05| 24596| Florida|
| Amanda Robinson|2019-01-06| 57395|Washington|
+-----------------+----------+-------+----------+
Alleen string
velden worden ondersteund, maar u kunt een schema opgeven met velden die overeenkomen met de velden van faker
pakketproviders om willekeurige gegevens te genereren voor testen en ontwikkelen. In het volgende voorbeeld wordt de gegevensbron geladen met name
en company
velden:
spark.read.format("fake").schema("name string, company string").load().show()
+---------------------+--------------+
|name |company |
+---------------------+--------------+
|Tanner Brennan |Adams Group |
|Leslie Maxwell |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc |
+---------------------+--------------+
Als u de gegevensbron wilt laden met een aangepast aantal rijen, geeft u de numRows
optie op. In het volgende voorbeeld worden vijf rijen opgegeven:
spark.read.format("fake").option("numRows", 5).load().show()
+--------------+----------+-------+------------+
| name| date|zipcode| state|
+--------------+----------+-------+------------+
| Pam Mitchell|1988-10-20| 23788| Tennessee|
|Melissa Turner|1996-06-14| 30851| Nevada|
| Brian Ramsey|2021-08-21| 55277| Washington|
| Caitlin Reed|1983-06-22| 89813|Pennsylvania|
| Douglas James|2007-01-18| 46226| Alabama|
+--------------+----------+-------+------------+
Voorbeeld 2: PySpark DataSource maken voor het streamen van lezen en schrijven
Maak een voorbeeldgegevensbron waarmee twee rijen in elke microbatch worden gegenereerd met behulp van het faker
Python-pakket om pySpark DataSource-streamlezer- en schrijfmogelijkheden te demonstreren. Voor meer informatie over faker
, zie de Faker-documentatie.
Installeer het faker
pakket met behulp van de volgende opdracht:
%pip install faker
Stap 1: De voorbeeldgegevensbron definiëren
Definieer eerst uw nieuwe PySpark DataSource als een subklasse van DataSource
met een naam, schema en methoden streamReader()
en streamWriter()
.
from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter
from pyspark.sql.types import StructType
class FakeStreamDataSource(DataSource):
"""
An example data source for streaming read and write using the `faker` library.
"""
@classmethod
def name(cls):
return "fakestream"
def schema(self):
return "name string, state string"
def streamReader(self, schema: StructType):
return FakeStreamReader(schema, self.options)
# If you don't need partitioning, you can implement the simpleStreamReader method instead of streamReader.
# def simpleStreamReader(self, schema: StructType):
# return SimpleStreamReader()
def streamWriter(self, schema: StructType, overwrite: bool):
return FakeStreamWriter(self.options)
Stap 2: De streamlezer implementeren
Implementeer vervolgens de voorbeeldlezer voor streaminggegevens waarmee twee rijen in elke microbatch worden gegenereerd. U kunt in plaats daarvan implementeren DataSourceStreamReader
of als de gegevensbron een lage doorvoer heeft en geen partitionering vereist, kunt u in plaats daarvan implementeren SimpleDataSourceStreamReader
.
simpleStreamReader()
of streamReader()
moet worden geïmplementeerd en simpleStreamReader()
wordt alleen aangeroepen wanneer streamReader()
niet is geïmplementeerd.
DataSourceStreamReader-implementatie
Het streamReader
exemplaar heeft een gehele offset die met 2 toeneemt in elke microbatch, geïmplementeerd via de DataSourceStreamReader
interface.
from pyspark.sql.datasource import InputPartition
from typing import Iterator, Tuple
import os
import json
class RangePartition(InputPartition):
def __init__(self, start, end):
self.start = start
self.end = end
class FakeStreamReader(DataSourceStreamReader):
def __init__(self, schema, options):
self.current = 0
def initialOffset(self) -> dict:
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}
def latestOffset(self) -> dict:
"""
Returns the current latest offset that the next microbatch will read to.
"""
self.current += 2
return {"offset": self.current}
def partitions(self, start: dict, end: dict):
"""
Plans the partitioning of the current microbatch defined by start and end offset. It
needs to return a sequence of :class:`InputPartition` objects.
"""
return [RangePartition(start["offset"], end["offset"])]
def commit(self, end: dict):
"""
This is invoked when the query has finished processing data before end offset. This
can be used to clean up the resource.
"""
pass
def read(self, partition) -> Iterator[Tuple]:
"""
Takes a partition as an input and reads an iterator of tuples from the data source.
"""
start, end = partition.start, partition.end
for i in range(start, end):
yield (i, str(i))
Implementatie van SimpleDataSourceStreamReader
Het SimpleStreamReader
exemplaar is hetzelfde als het FakeStreamReader
exemplaar dat twee rijen in elke batch genereert, maar geïmplementeerd met de SimpleDataSourceStreamReader
interface zonder partitionering.
class SimpleStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}
def read(self, start: dict) -> (Iterator[Tuple], dict):
"""
Takes start offset as an input, then returns an iterator of tuples and the start offset of the next read.
"""
start_idx = start["offset"]
it = iter([(i,) for i in range(start_idx, start_idx + 2)])
return (it, {"offset": start_idx + 2})
def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
"""
Takes start and end offset as inputs, then reads an iterator of data deterministically.
This is called when the query replays batches during restart or after a failure.
"""
start_idx = start["offset"]
end_idx = end["offset"]
return iter([(i,) for i in range(start_idx, end_idx)])
def commit(self, end):
"""
This is invoked when the query has finished processing data before end offset. This can be used to clean up resources.
"""
pass
Stap 3: De stream writer implementeren
Implementeer nu de streamingschrijver. Deze streaminggegevensschrijver schrijft de metagegevens van elke microbatch naar een lokaal pad.
class SimpleCommitMessage:
def __init__(self, partition_id: int, count: int):
self.partition_id = partition_id
self.count = count
class FakeStreamWriter(DataSourceStreamWriter):
def __init__(self, options):
self.options = options
self.path = self.options.get("path")
assert self.path is not None
def write(self, iterator):
"""
Writes the data, then returns the commit message of that partition. Library imports must be within the method.
"""
from pyspark import TaskContext
context = TaskContext.get()
partition_id = context.partitionId()
cnt = 0
for row in iterator:
cnt += 1
return SimpleCommitMessage(partition_id=partition_id, count=cnt)
def commit(self, messages, batchId) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` when all write tasks have succeeded, then decides what to do with it.
In this FakeStreamWriter, the metadata of the microbatch(number of rows and partitions) is written into a JSON file inside commit().
"""
status = dict(num_partitions=len(messages), rows=sum(m.count for m in messages))
with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
file.write(json.dumps(status) + "\n")
def abort(self, messages, batchId) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` from successful tasks when some other tasks have failed, then decides what to do with it.
In this FakeStreamWriter, a failure message is written into a text file inside abort().
"""
with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
file.write(f"failed in batch {batchId}")
Stap 4: De voorbeeldgegevensbron registreren en gebruiken
Als u de gegevensbron wilt gebruiken, moet u deze registreren. Nadat het is geregistreerd, kunt u het gebruiken in streamingquery's als bron of sink door een korte naam of volledige naam door te geven aan format()
. In het volgende voorbeeld wordt eerst de gegevensbron geregistreerd, waarna een query wordt gestart die leest uit de gegevensbron in het voorbeeld en uitvoer naar de console genereert.
spark.dataSource.register(FakeStreamDataSource)
query = spark.readStream.format("fakestream").load().writeStream.format("console").start()
U kunt ook in het volgende voorbeeld de voorbeeldstroom als sink gebruiken en een uitvoerpad opgeven:
query = spark.readStream.format("fakestream").load().writeStream.format("fake").start("/output_path")
Probleemoplossing
Als de uitvoer de volgende fout is, biedt uw berekening geen ondersteuning voor aangepaste PySpark-gegevensbronnen. U moet Databricks Runtime 15.2 of hoger gebruiken.
Error: [UNSUPPORTED_FEATURE.PYTHON_DATA_SOURCE] The feature is not supported: Python data sources. SQLSTATE: 0A000