Niestandardowe źródła danych PySpark
Ważne
Niestandardowe źródła danych PySpark są dostępne w ramach publicznej wersji zapoznawczej w środowisku Databricks Runtime 15.2 lub nowszym oraz w środowisku bezserwerowym w wersji 2. Obsługa przesyłania strumieniowego jest dostępna w środowisku Databricks Runtime 15.3 lub nowszym.
Źródło danych PySpark jest tworzone przez interfejs API źródła danych języka Python (PySpark), który umożliwia odczytywanie z niestandardowych źródeł danych i zapisywanie w niestandardowych ujściach danych na platformie Apache Spark przy użyciu języka Python. Niestandardowe źródła danych PySpark umożliwiają definiowanie niestandardowych połączeń z systemami danych i implementowanie dodatkowych funkcji w celu utworzenia źródeł danych wielokrotnego użytku.
Klasa DataSource
PySpark DataSource to klasa bazowa, która udostępnia metody tworzenia czytników danych i składników zapisywania.
Implementowanie podklasy źródła danych
W zależności od przypadku użycia następujące elementy muszą zostać zaimplementowane przez każdą podklasę, aby źródło danych było czytelne, zapisywalne lub oba:
Właściwość lub metoda | opis |
---|---|
name |
Wymagane. Nazwa źródła danych |
schema |
Wymagane. Schemat źródła danych do odczytu lub zapisu |
reader() |
Musi zwrócić wartość , DataSourceReader aby źródło danych było czytelne (wsadowe) |
writer() |
Musi zwrócić wartość , DataSourceWriter aby ujście danych było zapisywalne (wsadowe) |
streamReader() lub simpleStreamReader() |
Musi zwrócić wartość , DataSourceStreamReader aby strumień danych był czytelny (przesyłanie strumieniowe) |
streamWriter() |
Musi zwrócić wartość , DataSourceStreamWriter aby strumień danych był zapisywalny (przesyłanie strumieniowe) |
Uwaga
Elementy zdefiniowane przez użytkownika DataSource
, DataSourceReader
, DataSourceWriter
, DataSourceStreamReader
, DataSourceStreamWriter
oraz ich metody muszą być możliwe do serializacji. Innymi słowy, muszą być słownikiem lub zagnieżdżonym słownikiem zawierającym typ pierwotny.
Rejestrowanie źródła danych
Po zaimplementowaniu interfejsu należy go zarejestrować, a następnie załadować go lub użyć go w inny sposób, jak pokazano w poniższym przykładzie:
# Register the data source
spark.dataSource.register(MyDataSourceClass)
# Read from a custom data source
spark.read.format("my_datasource_name").load().show()
Przykład 1. Tworzenie źródła danych PySpark dla zapytania wsadowego
Aby zademonstrować możliwości czytnika źródła danych PySpark, utwórz źródło danych, które generuje przykładowe dane przy użyciu faker
pakietu języka Python. Aby uzyskać więcej informacji na temat faker
, zobacz dokumentację Faker.
faker
Zainstaluj pakiet przy użyciu następującego polecenia:
%pip install faker
Krok 1. Definiowanie przykładowego źródła danych
Najpierw zdefiniuj nowe źródło danych PySpark jako podklasę DataSource
o nazwie, schemacie i czytniku. Metoda reader()
musi być zdefiniowana, aby odczytać dane ze źródła w zapytaniu wsadowym.
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType
class FakeDataSource(DataSource):
"""
An example data source for batch query using the `faker` library.
"""
@classmethod
def name(cls):
return "fake"
def schema(self):
return "name string, date string, zipcode string, state string"
def reader(self, schema: StructType):
return FakeDataSourceReader(schema, self.options)
Krok 2: Implementacja czytnika dla zapytania wsadowego
Następnie zaimplementuj logikę czytelnika, aby wygenerować przykładowe dane. Użyj zainstalowanej faker
biblioteki, aby wypełnić każde pole w schemacie.
class FakeDataSourceReader(DataSourceReader):
def __init__(self, schema, options):
self.schema: StructType = schema
self.options = options
def read(self, partition):
# Library imports must be within the method.
from faker import Faker
fake = Faker()
# Every value in this `self.options` dictionary is a string.
num_rows = int(self.options.get("numRows", 3))
for _ in range(num_rows):
row = []
for field in self.schema.fields:
value = getattr(fake, field.name)()
row.append(value)
yield tuple(row)
Krok 3. Rejestrowanie i używanie przykładowego źródła danych
Aby użyć źródła danych, zarejestruj je. Domyślnie element FakeDataSource
ma trzy wiersze, a schemat zawiera następujące string
pola: name
, , date
zipcode
, state
. Poniższy przykład rejestruje, ładuje i generuje przykładowe źródło danych z wartościami domyślnymi:
spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
+-----------------+----------+-------+----------+
| name| date|zipcode| state|
+-----------------+----------+-------+----------+
|Christine Sampson|1979-04-24| 79766| Colorado|
| Shelby Cox|2011-08-05| 24596| Florida|
| Amanda Robinson|2019-01-06| 57395|Washington|
+-----------------+----------+-------+----------+
Obsługiwane są tylko string
pola, ale można określić schemat z dowolnymi polami odpowiadającymi faker
polam dostawców pakietów w celu wygenerowania losowych danych na potrzeby testowania i programowania. Poniższy przykład ładuje źródło danych z polami name
i company
.
spark.read.format("fake").schema("name string, company string").load().show()
+---------------------+--------------+
|name |company |
+---------------------+--------------+
|Tanner Brennan |Adams Group |
|Leslie Maxwell |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc |
+---------------------+--------------+
Aby załadować źródło danych z niestandardową liczbą wierszy, określ numRows
opcję. W poniższym przykładzie określono 5 wierszy:
spark.read.format("fake").option("numRows", 5).load().show()
+--------------+----------+-------+------------+
| name| date|zipcode| state|
+--------------+----------+-------+------------+
| Pam Mitchell|1988-10-20| 23788| Tennessee|
|Melissa Turner|1996-06-14| 30851| Nevada|
| Brian Ramsey|2021-08-21| 55277| Washington|
| Caitlin Reed|1983-06-22| 89813|Pennsylvania|
| Douglas James|2007-01-18| 46226| Alabama|
+--------------+----------+-------+------------+
Przykład 2. Tworzenie źródła danych PySpark na potrzeby przesyłania strumieniowego odczytu i zapisu
pl-PL: Aby zademonstrować możliwości czytnika i zapisu strumienia PySpark DataSource, utwórz przykładowe źródło danych, które generuje dwa wiersze w każdej mikropartycji, korzystając z pakietu faker
języka Python. Aby uzyskać więcej informacji na temat faker
, zobacz dokumentację Faker.
faker
Zainstaluj pakiet przy użyciu następującego polecenia:
%pip install faker
Krok 1. Definiowanie przykładowego źródła danych
Najpierw zdefiniuj nowe źródło danych PySpark jako podklasę DataSource
o nazwie, schemacie i metodach streamReader()
oraz streamWriter()
.
from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter
from pyspark.sql.types import StructType
class FakeStreamDataSource(DataSource):
"""
An example data source for streaming read and write using the `faker` library.
"""
@classmethod
def name(cls):
return "fakestream"
def schema(self):
return "name string, state string"
def streamReader(self, schema: StructType):
return FakeStreamReader(schema, self.options)
# If you don't need partitioning, you can implement the simpleStreamReader method instead of streamReader.
# def simpleStreamReader(self, schema: StructType):
# return SimpleStreamReader()
def streamWriter(self, schema: StructType, overwrite: bool):
return FakeStreamWriter(self.options)
Krok 2. Implementowanie czytnika strumienia
Następnie zaimplementuj przykładowe odczytywanie danych przesyłania strumieniowego, które generuje dwa wiersze w każdej mikropartii. Można zaimplementować metodę DataSourceStreamReader
lub jeśli źródło danych ma niską przepływność i nie wymaga partycjonowania, możesz zaimplementować SimpleDataSourceStreamReader
zamiast tego. Albo simpleStreamReader()
, albo streamReader()
musi być zaimplementowany, a simpleStreamReader()
jest wywoływany tylko wtedy, gdy streamReader()
nie jest zaimplementowany.
Implementacja DataSourceStreamReader
Wystąpienie streamReader
ma przesunięcie całkowite, które zwiększa się o 2 w każdej mikropartii, zaimplementowane za pomocą interfejsu DataSourceStreamReader
.
from pyspark.sql.datasource import InputPartition
from typing import Iterator, Tuple
import os
import json
class RangePartition(InputPartition):
def __init__(self, start, end):
self.start = start
self.end = end
class FakeStreamReader(DataSourceStreamReader):
def __init__(self, schema, options):
self.current = 0
def initialOffset(self) -> dict:
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}
def latestOffset(self) -> dict:
"""
Returns the current latest offset that the next microbatch will read to.
"""
self.current += 2
return {"offset": self.current}
def partitions(self, start: dict, end: dict):
"""
Plans the partitioning of the current microbatch defined by start and end offset. It
needs to return a sequence of :class:`InputPartition` objects.
"""
return [RangePartition(start["offset"], end["offset"])]
def commit(self, end: dict):
"""
This is invoked when the query has finished processing data before end offset. This
can be used to clean up the resource.
"""
pass
def read(self, partition) -> Iterator[Tuple]:
"""
Takes a partition as an input and reads an iterator of tuples from the data source.
"""
start, end = partition.start, partition.end
for i in range(start, end):
yield (i, str(i))
Implementacja SimpleDataSourceStreamReader
Wystąpienie SimpleStreamReader
jest takie samo jak wystąpienie FakeStreamReader
, które generuje dwa wiersze w każdym przetwarzaniu partii, ale jest zaimplementowane z użyciem interfejsu SimpleDataSourceStreamReader
bez partycjonowania.
class SimpleStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}
def read(self, start: dict) -> (Iterator[Tuple], dict):
"""
Takes start offset as an input, then returns an iterator of tuples and the start offset of the next read.
"""
start_idx = start["offset"]
it = iter([(i,) for i in range(start_idx, start_idx + 2)])
return (it, {"offset": start_idx + 2})
def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
"""
Takes start and end offset as inputs, then reads an iterator of data deterministically.
This is called when the query replays batches during restart or after a failure.
"""
start_idx = start["offset"]
end_idx = end["offset"]
return iter([(i,) for i in range(start_idx, end_idx)])
def commit(self, end):
"""
This is invoked when the query has finished processing data before end offset. This can be used to clean up resources.
"""
pass
Krok 3. Implementowanie modułu zapisywania strumienia
Teraz zaimplementuj pisarz strumieniowy. Moduł strumieniowego zapisu danych zapisuje informacje o metadanych każdego mikropakietu do ścieżki lokalnej.
class SimpleCommitMessage:
def __init__(self, partition_id: int, count: int):
self.partition_id = partition_id
self.count = count
class FakeStreamWriter(DataSourceStreamWriter):
def __init__(self, options):
self.options = options
self.path = self.options.get("path")
assert self.path is not None
def write(self, iterator):
"""
Writes the data, then returns the commit message of that partition. Library imports must be within the method.
"""
from pyspark import TaskContext
context = TaskContext.get()
partition_id = context.partitionId()
cnt = 0
for row in iterator:
cnt += 1
return SimpleCommitMessage(partition_id=partition_id, count=cnt)
def commit(self, messages, batchId) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` when all write tasks have succeeded, then decides what to do with it.
In this FakeStreamWriter, the metadata of the microbatch(number of rows and partitions) is written into a JSON file inside commit().
"""
status = dict(num_partitions=len(messages), rows=sum(m.count for m in messages))
with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
file.write(json.dumps(status) + "\n")
def abort(self, messages, batchId) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` from successful tasks when some other tasks have failed, then decides what to do with it.
In this FakeStreamWriter, a failure message is written into a text file inside abort().
"""
with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
file.write(f"failed in batch {batchId}")
Krok 4. Rejestrowanie i używanie przykładowego źródła danych
Aby użyć źródła danych, zarejestruj je. Po zarejestrowaniu można go używać w zapytaniach przesyłania strumieniowego jako źródła lub ujścia, przekazując krótką lub pełną nazwę do format()
. Poniższy przykład rejestruje źródło danych, a następnie uruchamia zapytanie, które odczytuje dane z przykładowego źródła i przekazuje je do konsoli.
spark.dataSource.register(FakeStreamDataSource)
query = spark.readStream.format("fakestream").load().writeStream.format("console").start()
Alternatywnie w poniższym przykładzie użyto przykładowego strumienia jako ujścia i określono ścieżkę wyjściową:
query = spark.readStream.format("fakestream").load().writeStream.format("fake").start("/output_path")
Rozwiązywanie problemów
Jeśli dane wyjściowe są następujące, Twoje środowisko obliczeniowe nie obsługuje niestandardowych źródeł danych PySpark. Musisz użyć środowiska Databricks Runtime 15.2 lub nowszego.
Error: [UNSUPPORTED_FEATURE.PYTHON_DATA_SOURCE] The feature is not supported: Python data sources. SQLSTATE: 0A000