Anpassade datakällor i PySpark
Viktigt!
Anpassade PySpark-datakällor finns i offentlig förhandsversion i Databricks Runtime 15.2 och senare. Direktuppspelningsstöd är tillgängligt i Databricks Runtime 15.3 och senare.
En PySpark DataSource skapas av Python (PySpark) DataSource-API:et, som gör det möjligt att läsa från anpassade datakällor och skriva till anpassade datamottagare i Apache Spark med python. Du kan använda anpassade PySpark-datakällor för att definiera anpassade anslutningar till datasystem och implementera ytterligare funktioner för att skapa återanvändbara datakällor.
DataSource-klass
PySpark DataSource är en basklass som tillhandahåller metoder för att skapa dataläsare och författare.
Implementera underklassen datakälla
Beroende på ditt användningsfall måste följande implementeras av alla underklasser för att göra en datakälla antingen läsbar, skrivbar eller både och:
Egenskap eller metod | beskrivning |
---|---|
name |
Obligatoriskt. Namnet på datakällan |
schema |
Obligatoriskt. Schemat för datakällan som ska läsas eller skrivas |
reader() |
Måste returnera en DataSourceReader för att göra datakällan läsbar (batch) |
writer() |
Måste returnera en DataSourceWriter för att göra datamottagaren skrivbar (batch) |
streamReader() eller simpleStreamReader() |
Måste returnera en DataSourceStreamReader för att göra dataströmmen läsbar (direktuppspelning) |
streamWriter() |
Måste returnera en DataSourceStreamWriter för att göra dataströmmen skrivbar (direktuppspelning) |
Kommentar
De användardefinierade DataSource
metoderna , DataSourceReader
, DataSourceWriter
, DataSourceStreamReader
, DataSourceStreamWriter
och deras metoder måste kunna serialiseras. Med andra ord måste de vara en ordlista eller kapslad ordlista som innehåller en primitiv typ.
Registrera datakällan
När du har implementerat gränssnittet måste du registrera det, sedan kan du läsa in eller på annat sätt använda det som visas i följande exempel:
# Register the data source
spark.dataSource.register(MyDataSourceClass)
# Read from a custom data source
spark.read.format("my_datasource_name").load().show()
Exempel 1: Skapa en PySpark DataSource för batchfråga
Om du vill demonstrera PySpark DataSource-läsarfunktioner skapar du en datakälla som genererar exempeldata med hjälp av Python-paketet faker
. Mer information om faker
finns i Faker-dokumentationen.
faker
Installera paketet med följande kommando:
%pip install faker
Steg 1: Definiera exemplet DataSource
Definiera först din nya PySpark DataSource som en underklass med DataSource
namn, schema och läsare. Metoden reader()
måste definieras för att läsa från en datakälla i en batchfråga.
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType
class FakeDataSource(DataSource):
"""
An example data source for batch query using the `faker` library.
"""
@classmethod
def name(cls):
return "fake"
def schema(self):
return "name string, date string, zipcode string, state string"
def reader(self, schema: StructType):
return FakeDataSourceReader(schema, self.options)
Steg 2: Implementera läsaren för en batchfråga
Implementera sedan läsarlogik för att generera exempeldata. Använd det installerade faker
biblioteket för att fylla i varje fält i schemat.
class FakeDataSourceReader(DataSourceReader):
def __init__(self, schema, options):
self.schema: StructType = schema
self.options = options
def read(self, partition):
# Library imports must be within the method.
from faker import Faker
fake = Faker()
# Every value in this `self.options` dictionary is a string.
num_rows = int(self.options.get("numRows", 3))
for _ in range(num_rows):
row = []
for field in self.schema.fields:
value = getattr(fake, field.name)()
row.append(value)
yield tuple(row)
Steg 3: Registrera och använda exempeldatakällan
Om du vill använda datakällan registrerar du den. Som standard har de FakeDataSource
tre raderna och schemat innehåller följande string
fält: name
, date
, zipcode
, . state
Följande exempel registrerar, läser in och matar ut exempeldatakällan med standardvärdena:
spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
+-----------------+----------+-------+----------+
| name| date|zipcode| state|
+-----------------+----------+-------+----------+
|Christine Sampson|1979-04-24| 79766| Colorado|
| Shelby Cox|2011-08-05| 24596| Florida|
| Amanda Robinson|2019-01-06| 57395|Washington|
+-----------------+----------+-------+----------+
Endast string
fält stöds, men du kan ange ett schema med alla fält som motsvarar faker
paketleverantörernas fält för att generera slumpmässiga data för testning och utveckling. I följande exempel läses datakällan in med name
och company
fält:
spark.read.format("fake").schema("name string, company string").load().show()
+---------------------+--------------+
|name |company |
+---------------------+--------------+
|Tanner Brennan |Adams Group |
|Leslie Maxwell |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc |
+---------------------+--------------+
Om du vill läsa in datakällan med ett anpassat antal rader anger du alternativet numRows
. I följande exempel anges 5 rader:
spark.read.format("fake").option("numRows", 5).load().show()
+--------------+----------+-------+------------+
| name| date|zipcode| state|
+--------------+----------+-------+------------+
| Pam Mitchell|1988-10-20| 23788| Tennessee|
|Melissa Turner|1996-06-14| 30851| Nevada|
| Brian Ramsey|2021-08-21| 55277| Washington|
| Caitlin Reed|1983-06-22| 89813|Pennsylvania|
| Douglas James|2007-01-18| 46226| Alabama|
+--------------+----------+-------+------------+
Exempel 2: Skapa PySpark DataSource för direktuppspelning av läsning och skrivning
Skapa en exempeldatakälla som genererar två rader i varje mikrobatch med hjälp av faker
Python-paketet för att demonstrera strömläsare och skrivarfunktioner i PySpark DataSource. Mer information om faker
finns i Faker-dokumentationen.
faker
Installera paketet med följande kommando:
%pip install faker
Steg 1: Definiera exemplet DataSource
Definiera först din nya PySpark DataSource som en underklass av DataSource
med ett namn, schema och metoder streamReader()
och streamWriter()
.
from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter
from pyspark.sql.types import StructType
class FakeStreamDataSource(DataSource):
"""
An example data source for streaming read and write using the `faker` library.
"""
@classmethod
def name(cls):
return "fakestream"
def schema(self):
return "name string, state string"
def streamReader(self, schema: StructType):
return FakeStreamReader(schema, self.options)
# If you don't need partitioning, you can implement the simpleStreamReader method instead of streamReader.
# def simpleStreamReader(self, schema: StructType):
# return SimpleStreamReader()
def streamWriter(self, schema: StructType, overwrite: bool):
return FakeStreamWriter(self.options)
Steg 2: Implementera strömläsaren
Implementera sedan exempelläsaren för strömmande data som genererar två rader i varje mikrobatch. Du kan implementera DataSourceStreamReader
, eller om datakällan har lågt dataflöde och inte kräver partitionering, kan du implementera SimpleDataSourceStreamReader
i stället. Antingen simpleStreamReader()
eller streamReader()
måste implementeras och simpleStreamReader()
anropas endast när streamReader()
inte har implementerats.
Implementering av DataSourceStreamReader
Instansen streamReader
har en heltalsförskjutning som ökar med 2 i varje mikrobatch, implementerad med DataSourceStreamReader
gränssnittet.
class RangePartition(InputPartition):
def __init__(self, start, end):
self.start = start
self.end = end
class FakeStreamReader(DataSourceStreamReader):
def __init__(self, schema, options):
self.current = 0
def initialOffset(self) -> dict:
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}
def latestOffset(self) -> dict:
"""
Returns the current latest offset that the next microbatch will read to.
"""
self.current += 2
return {"offset": self.current}
def partitions(self, start: dict, end: dict):
"""
Plans the partitioning of the current microbatch defined by start and end offset. It
needs to return a sequence of :class:`InputPartition` objects.
"""
return [RangePartition(start["offset"], end["offset"])]
def commit(self, end: dict):
"""
This is invoked when the query has finished processing data before end offset. This
can be used to clean up the resource.
"""
pass
def read(self, partition) -> Iterator[Tuple]:
"""
Takes a partition as an input and reads an iterator of tuples from the data source.
"""
start, end = partition.start, partition.end
for i in range(start, end):
yield (i, str(i))
Implementering av SimpleDataSourceStreamReader
Instansen SimpleStreamReader
är samma som den FakeStreamReader
instans som genererar två rader i varje batch, men implementeras med SimpleDataSourceStreamReader
gränssnittet utan partitionering.
class SimpleStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}
def read(self, start: dict) -> (Iterator[Tuple], dict):
"""
Takes start offset as an input, then returns an iterator of tuples and the start offset of the next read.
"""
start_idx = start["offset"]
it = iter([(i,) for i in range(start_idx, start_idx + 2)])
return (it, {"offset": start_idx + 2})
def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
"""
Takes start and end offset as inputs, then reads an iterator of data deterministically.
This is called when the query replays batches during restart or after a failure.
"""
start_idx = start["offset"]
end_idx = end["offset"]
return iter([(i,) for i in range(start_idx, end_idx)])
def commit(self, end):
"""
This is invoked when the query has finished processing data before end offset. This can be used to clean up resources.
"""
pass
Steg 3: Implementera strömskrivaren
Implementera nu strömningsskrivaren. Den här strömmande dataskrivaren skriver metadatainformationen för varje mikrobatch till en lokal sökväg.
class SimpleCommitMessage(WriterCommitMessage):
partition_id: int
count: int
class FakeStreamWriter(DataSourceStreamWriter):
def __init__(self, options):
self.options = options
self.path = self.options.get("path")
assert self.path is not None
def write(self, iterator):
"""
Writes the data, then returns the commit message of that partition. Library imports must be within the method.
"""
from pyspark import TaskContext
context = TaskContext.get()
partition_id = context.partitionId()
cnt = 0
for row in iterator:
cnt += 1
return SimpleCommitMessage(partition_id=partition_id, count=cnt)
def commit(self, messages, batchId) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` when all write tasks have succeeded, then decides what to do with it.
In this FakeStreamWriter, the metadata of the microbatch(number of rows and partitions) is written into a JSON file inside commit().
"""
status = dict(num_partitions=len(messages), rows=sum(m.count for m in messages))
with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
file.write(json.dumps(status) + "\n")
def abort(self, messages, batchId) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` from successful tasks when some other tasks have failed, then decides what to do with it.
In this FakeStreamWriter, a failure message is written into a text file inside abort().
"""
with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
file.write(f"failed in batch {batchId}")
Steg 4: Registrera och använda exempeldatakällan
Om du vill använda datakällan registrerar du den. När den är regsitered kan du använda den i strömmande frågor som källa eller mottagare genom att skicka ett kort namn eller fullständigt namn till format()
. I följande exempel registreras datakällan och sedan startas en fråga som läser från exempeldatakällan och utdata till konsolen:
spark.dataSource.register(FakeStreamDataSource)
query = spark.readStream.format("fakestream").load().writeStream.format("console").start()
I följande exempel används också exempelströmmen som en mottagare och anger en utdatasökväg:
query = spark.readStream.format("fakestream").load().writeStream.format("fake").start("/output_path")
Felsökning
Om utdata är följande fel stöder din beräkning inte anpassade PySpark-datakällor. Du måste använda Databricks Runtime 15.2 eller senare.
Error: [UNSUPPORTED_FEATURE.PYTHON_DATA_SOURCE] The feature is not supported: Python data sources. SQLSTATE: 0A000