Fontes de dados personalizadas do PySpark
Importante
As fontes de dados personalizadas do PySpark estão em Visualização Pública no Databricks Runtime 15.2 e superior. O suporte de streaming está disponível no Databricks Runtime 15.3 e superior.
Um PySpark DataSource é criado pela API DataSource Python (PySpark), que permite ler fontes de dados personalizadas e gravar em coletores de dados personalizados no Apache Spark usando Python. Você pode usar fontes de dados personalizadas do PySpark para definir connections personalizadas para sistemas de dados e implementar funcionalidades adicionais, para criar fontes de dados reutilizáveis.
DataSource classe
O PySpark DataSource é uma classe base que fornece métodos para criar leitores e gravadores de dados.
Implementar a subclasse da fonte de dados
Dependendo do seu caso de uso, o seguinte deve ser implementado por qualquer subclasse para tornar uma fonte de dados legível, gravável ou ambas:
Propriedade ou Método | Description |
---|---|
name |
Obrigatório. O nome da fonte de dados |
schema |
Obrigatório. O schema da fonte de dados a ser lido ou gravado |
reader() |
Deve retornar a DataSourceReader para tornar a fonte de dados legível (lote) |
writer() |
Deve retornar a DataSourceWriter para tornar o coletor de dados gravável (lote) |
streamReader() ou simpleStreamReader() |
Deve retornar a DataSourceStreamReader para tornar o fluxo de dados legível (streaming) |
streamWriter() |
Deve retornar a DataSourceStreamWriter para tornar o fluxo de dados gravável (streaming) |
Nota
O , , DataSource
DataSourceReader
, DataSourceWriter
, , DataSourceStreamReader
e seus métodos definidos pelo usuário DataSourceStreamWriter
devem poder ser serializados. Em outras palavras, eles devem ser um dicionário ou dicionário aninhado que contém um tipo primitivo.
Registar a fonte de dados
Depois de implementar a interface, você deve registrá-lo, então você pode carregá-lo ou usá-lo de outra forma, como mostrado no exemplo a seguir:
# Register the data source
spark.dataSource.register(MyDataSourceClass)
# Read from a custom data source
spark.read.format("my_datasource_name").load().show()
Exemplo 1: Criar uma fonte de dados PySpark para consulta em lote
Para demonstrar os recursos do leitor PySpark DataSource, crie uma fonte de dados que gere dados de exemplo usando o faker
pacote Python. Para obter mais informações sobre faker
o , consulte a documentação do Faker.
Instale o faker
pacote usando o seguinte comando:
%pip install faker
Etapa 1: Definir o exemplo DataSource
Primeiro, defina seu novo PySpark DataSource como uma subclasse de DataSource
com nome, schemae leitor. O reader()
método deve ser definido para ler a partir de uma fonte de dados em uma consulta em lotes.
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType
class FakeDataSource(DataSource):
"""
An example data source for batch query using the `faker` library.
"""
@classmethod
def name(cls):
return "fake"
def schema(self):
return "name string, date string, zipcode string, state string"
def reader(self, schema: StructType):
return FakeDataSourceReader(schema, self.options)
Etapa 2: Implementar o leitor para uma consulta em lote
Em seguida, implemente a lógica de leitura para os dados de exemplo de generate. Utilize a biblioteca instalada de faker
para preencher cada campo no schema.
class FakeDataSourceReader(DataSourceReader):
def __init__(self, schema, options):
self.schema: StructType = schema
self.options = options
def read(self, partition):
# Library imports must be within the method.
from faker import Faker
fake = Faker()
# Every value in this `self.options` dictionary is a string.
num_rows = int(self.options.get("numRows", 3))
for _ in range(num_rows):
row = []
for field in self.schema.fields:
value = getattr(fake, field.name)()
row.append(value)
yield tuple(row)
Etapa 3: Registrar e usar a fonte de dados de exemplo
Para usar a fonte de dados, registre-a. Por padrão, o FakeDataSource
tem três linhas e o schema inclui estes string
campos: name
, date
, zipcode
, state
. O exemplo a seguir registra, carrega e gera saídas da fonte de dados de exemplo com os padrões:
spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
+-----------------+----------+-------+----------+
| name| date|zipcode| state|
+-----------------+----------+-------+----------+
|Christine Sampson|1979-04-24| 79766| Colorado|
| Shelby Cox|2011-08-05| 24596| Florida|
| Amanda Robinson|2019-01-06| 57395|Washington|
+-----------------+----------+-------+----------+
Apenas os campos string
são suportados, mas é possível especificar um schema com quaisquer campos que correspondam aos do pacote faker
providerspara gerar generate dados aleatórios para teste e desenvolvimento. O exemplo a seguir carrega a fonte de dados com name
e company
campos:
spark.read.format("fake").schema("name string, company string").load().show()
+---------------------+--------------+
|name |company |
+---------------------+--------------+
|Tanner Brennan |Adams Group |
|Leslie Maxwell |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc |
+---------------------+--------------+
Para carregar a fonte de dados com um número personalizado de linhas, especifique a numRows
opção. O exemplo a seguir especifica 5 linhas:
spark.read.format("fake").option("numRows", 5).load().show()
+--------------+----------+-------+------------+
| name| date|zipcode| state|
+--------------+----------+-------+------------+
| Pam Mitchell|1988-10-20| 23788| Tennessee|
|Melissa Turner|1996-06-14| 30851| Nevada|
| Brian Ramsey|2021-08-21| 55277| Washington|
| Caitlin Reed|1983-06-22| 89813|Pennsylvania|
| Douglas James|2007-01-18| 46226| Alabama|
+--------------+----------+-------+------------+
Exemplo 2: Criar PySpark DataSource para streaming de leitura e gravação
Para demonstrar os recursos de leitor e gravador de fluxo do PySpark DataSource, crie uma fonte de dados de exemplo que gere duas linhas em cada microlote usando o faker
pacote Python. Para obter mais informações sobre faker
o , consulte a documentação do Faker.
Instale o faker
pacote usando o seguinte comando:
%pip install faker
Etapa 1: Definir o exemplo DataSource
Primeiro, defina seu novo PySpark DataSource como uma subclasse de DataSource
com nome, schemae métodos streamReader()
e streamWriter()
.
from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter
from pyspark.sql.types import StructType
class FakeStreamDataSource(DataSource):
"""
An example data source for streaming read and write using the `faker` library.
"""
@classmethod
def name(cls):
return "fakestream"
def schema(self):
return "name string, state string"
def streamReader(self, schema: StructType):
return FakeStreamReader(schema, self.options)
# If you don't need partitioning, you can implement the simpleStreamReader method instead of streamReader.
# def simpleStreamReader(self, schema: StructType):
# return SimpleStreamReader()
def streamWriter(self, schema: StructType, overwrite: bool):
return FakeStreamWriter(self.options)
Etapa 2: Implementar o leitor de fluxo
Em seguida, implemente o exemplo de leitor de dados de streaming que gera duas linhas em cada microlote. Você pode implementar DataSourceStreamReader
o , ou se a fonte de dados tiver baixa taxa de transferência e não exigir particionamento, você poderá implementá-la SimpleDataSourceStreamReader
. Ou simpleStreamReader()
streamReader()
deve ser implementado, e simpleStreamReader()
só é invocado quando streamReader()
não é implementado.
Implementação de DataSourceStreamReader
A instância streamReader
tem um offset inteiro que aumenta em 2 em cada microlote, implementado com a interface DataSourceStreamReader
.
class RangePartition(InputPartition):
def __init__(self, start, end):
self.start = start
self.end = end
class FakeStreamReader(DataSourceStreamReader):
def __init__(self, schema, options):
self.current = 0
def initialOffset(self) -> dict:
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}
def latestOffset(self) -> dict:
"""
Returns the current latest offset that the next microbatch will read to.
"""
self.current += 2
return {"offset": self.current}
def partitions(self, start: dict, end: dict):
"""
Plans the partitioning of the current microbatch defined by start and end offset. It
needs to return a sequence of :class:`InputPartition` objects.
"""
return [RangePartition(start["offset"], end["offset"])]
def commit(self, end: dict):
"""
This is invoked when the query has finished processing data before end offset. This
can be used to clean up the resource.
"""
pass
def read(self, partition) -> Iterator[Tuple]:
"""
Takes a partition as an input and reads an iterator of tuples from the data source.
"""
start, end = partition.start, partition.end
for i in range(start, end):
yield (i, str(i))
Implementação do SimpleDataSourceStreamReader
A SimpleStreamReader
instância é a mesma que gera FakeStreamReader
duas linhas em cada lote, mas implementada com a SimpleDataSourceStreamReader
interface sem particionamento.
class SimpleStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}
def read(self, start: dict) -> (Iterator[Tuple], dict):
"""
Takes start offset as an input, then returns an iterator of tuples and the start offset of the next read.
"""
start_idx = start["offset"]
it = iter([(i,) for i in range(start_idx, start_idx + 2)])
return (it, {"offset": start_idx + 2})
def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
"""
Takes start and end offset as inputs, then reads an iterator of data deterministically.
This is called when the query replays batches during restart or after a failure.
"""
start_idx = start["offset"]
end_idx = end["offset"]
return iter([(i,) for i in range(start_idx, end_idx)])
def commit(self, end):
"""
This is invoked when the query has finished processing data before end offset. This can be used to clean up resources.
"""
pass
Etapa 3: Implementar o gravador de fluxo
Agora implemente o gravador de streaming. Este gravador de dados de streaming grava as informações de metadados de cada microlote em um caminho local.
class SimpleCommitMessage(WriterCommitMessage):
partition_id: int
count: int
class FakeStreamWriter(DataSourceStreamWriter):
def __init__(self, options):
self.options = options
self.path = self.options.get("path")
assert self.path is not None
def write(self, iterator):
"""
Writes the data, then returns the commit message of that partition. Library imports must be within the method.
"""
from pyspark import TaskContext
context = TaskContext.get()
partition_id = context.partitionId()
cnt = 0
for row in iterator:
cnt += 1
return SimpleCommitMessage(partition_id=partition_id, count=cnt)
def commit(self, messages, batchId) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` when all write tasks have succeeded, then decides what to do with it.
In this FakeStreamWriter, the metadata of the microbatch(number of rows and partitions) is written into a JSON file inside commit().
"""
status = dict(num_partitions=len(messages), rows=sum(m.count for m in messages))
with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
file.write(json.dumps(status) + "\n")
def abort(self, messages, batchId) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` from successful tasks when some other tasks have failed, then decides what to do with it.
In this FakeStreamWriter, a failure message is written into a text file inside abort().
"""
with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
file.write(f"failed in batch {batchId}")
Etapa 4: Registrar e usar a fonte de dados de exemplo
Para usar a fonte de dados, registre-a. Depois que ele for regsitered, você poderá usá-lo em consultas de streaming como fonte ou coletor passando um nome curto ou completo para format()
. O exemplo a seguir registra a fonte de dados e, em seguida, inicia uma consulta que lê a partir da fonte de dados de exemplo e envia para o console:
spark.dataSource.register(FakeStreamDataSource)
query = spark.readStream.format("fakestream").load().writeStream.format("console").start()
Como alternativa, o exemplo a seguir usa o fluxo de exemplo como um coletor e especifica um caminho de saída:
query = spark.readStream.format("fakestream").load().writeStream.format("fake").start("/output_path")
Resolução de Problemas
Se a saída for o seguinte erro, sua computação não suporta fontes de dados personalizadas do PySpark. Você deve usar o Databricks Runtime 15.2 ou superior.
Error: [UNSUPPORTED_FEATURE.PYTHON_DATA_SOURCE] The feature is not supported: Python data sources. SQLSTATE: 0A000