PySpark 自定义数据源

重要

PySpark 自定义数据源在 Databricks Runtime 15.2 及更高版本中以公共预览版提供。 Databricks Runtime 15.3 及更高版本提供流式处理支持。

PySpark DataSource 是由 Python (PySpark) DataSource API 创建的,它允许使用 Python 从自定义数据源读取数据并写入 Apache Spark 中的自定义数据接收器。 可以使用 PySpark 自定义数据源来定义到数据系统的自定义连接,并实现其他功能,以构建可重用的数据源。

DataSource 类

PySpark DataSource 是基类,它提供创建数据读取者和写入者的方法。

实现数据源子类

根据你的用例,任何子类必须实现以下元素,使数据源可读、可写或可读写:

属性或方法 说明
name 必填。 数据源的名称
schema 必填。 要读取或写入的数据源的架构
reader() 必须返回 DataSourceReader 才能使数据源可读(批处理)
writer() 必须返回 DataSourceWriter 才能使数据接收器可写(批处理)
streamReader()simpleStreamReader() 必须返回 DataSourceStreamReader 才能使数据流可读(流式处理)
streamWriter() 必须返回 DataSourceStreamWriter 才能使数据流可写(流式处理)

注意

用户定义的 DataSourceDataSourceReaderDataSourceWriterDataSourceStreamReaderDataSourceStreamWriter 及其方法必须能够序列化。 换言之,它们必须是包含基元类型的字典或嵌套字典。

注册数据源

实现接口后,必须注册它,然后才能加载或以其他方式使用它,如以下示例所示:

# Register the data source
spark.dataSource.register(MyDataSourceClass)

# Read from a custom data source
spark.read.format("my_datasource_name").load().show()

示例 1:为批处理查询创建 PySpark DataSource

为了演示 PySpark DataSource 读取器功能,请创建数据源,用于通过 faker Python 包生成示例数据。 有关 faker 的更多信息,请参阅 Faker 文档

使用以下命令安装 faker 包:

%pip install faker

步骤 1:定义示例 DataSource

首先,将新的 PySpark DataSource 定义为 DataSource 的子类,并带有名称、架构和读取器。 必须将 reader() 方法定义为从批处理查询中的数据源读取。

from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType

class FakeDataSource(DataSource):
    """
    An example data source for batch query using the `faker` library.
    """

    @classmethod
    def name(cls):
        return "fake"

    def schema(self):
        return "name string, date string, zipcode string, state string"

    def reader(self, schema: StructType):
        return FakeDataSourceReader(schema, self.options)

步骤 2:实现批处理查询的读取器

接下来,实现读取者逻辑以生成示例数据。 使用已安装的 faker 库填充架构中的每个字段。

class FakeDataSourceReader(DataSourceReader):

    def __init__(self, schema, options):
        self.schema: StructType = schema
        self.options = options

    def read(self, partition):
        # Library imports must be within the method.
        from faker import Faker
        fake = Faker()

        # Every value in this `self.options` dictionary is a string.
        num_rows = int(self.options.get("numRows", 3))
        for _ in range(num_rows):
            row = []
            for field in self.schema.fields:
                value = getattr(fake, field.name)()
                row.append(value)
            yield tuple(row)

步骤 3:注册并使用示例数据源

要使用数据源,请对其进行注册。 默认情况下,FakeDataSource 有三行,架构包括以下 string 字段:namedatezipcodestate。 以下示例使用默认值注册、加载和输出示例数据源:

spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
+-----------------+----------+-------+----------+
|             name|      date|zipcode|     state|
+-----------------+----------+-------+----------+
|Christine Sampson|1979-04-24|  79766|  Colorado|
|       Shelby Cox|2011-08-05|  24596|   Florida|
|  Amanda Robinson|2019-01-06|  57395|Washington|
+-----------------+----------+-------+----------+

只支持 string 字段,但可以指定包含与 faker 包提供程序字段对应的任何字段的架构,以生成用于测试和开发的随机数据。 以下示例加载具有 namecompany 字段的数据源:

spark.read.format("fake").schema("name string, company string").load().show()
+---------------------+--------------+
|name                 |company       |
+---------------------+--------------+
|Tanner Brennan       |Adams Group   |
|Leslie Maxwell       |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc   |
+---------------------+--------------+

若要加载具有自定义行数的数据源,请指定 numRows 选项。 以下示例指定了 5 行:

spark.read.format("fake").option("numRows", 5).load().show()
+--------------+----------+-------+------------+
|          name|      date|zipcode|       state|
+--------------+----------+-------+------------+
|  Pam Mitchell|1988-10-20|  23788|   Tennessee|
|Melissa Turner|1996-06-14|  30851|      Nevada|
|  Brian Ramsey|2021-08-21|  55277|  Washington|
|  Caitlin Reed|1983-06-22|  89813|Pennsylvania|
| Douglas James|2007-01-18|  46226|     Alabama|
+--------------+----------+-------+------------+

示例 2:为流式读取和写入创建 PySpark DataSource

为了演示 PySpark DataSource 流读取器和写入器功能,请创建一个示例数据源,用于通过 faker Python 包在每个微批中生成两行。 有关 faker 的更多信息,请参阅 Faker 文档

使用以下命令安装 faker 包:

%pip install faker

步骤 1:定义示例 DataSource

首先,将新的 PySpark DataSource 定义为 DataSource 的子类,并带有名称、架构以及方法 streamReader()streamWriter()

from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter
from pyspark.sql.types import StructType

class FakeStreamDataSource(DataSource):
    """
    An example data source for streaming read and write using the `faker` library.
    """

    @classmethod
    def name(cls):
        return "fakestream"

    def schema(self):
        return "name string, state string"

    def streamReader(self, schema: StructType):
        return FakeStreamReader(schema, self.options)

    # If you don't need partitioning, you can implement the simpleStreamReader method instead of streamReader.
    # def simpleStreamReader(self, schema: StructType):
    #    return SimpleStreamReader()

    def streamWriter(self, schema: StructType, overwrite: bool):
        return FakeStreamWriter(self.options)

步骤 2:实现流读取器

接下来,实现在每个微批中生成两行的示例流数据读取器。 可以实现 DataSourceStreamReader,或者,如果数据源吞吐量较低且不需要分区,则你可以实现 SimpleDataSourceStreamReader。 必须实现 simpleStreamReader()streamReader(),并且仅当未实现 simpleStreamReader() 时才会调用 streamReader()

DataSourceStreamReader 实现

streamReader 实例具有一个整数偏移量,它在每个微批中递增 2,并通过 DataSourceStreamReader 接口实现。

from pyspark.sql.datasource import InputPartition
from typing import Iterator, Tuple
import os
import json

class RangePartition(InputPartition):
    def __init__(self, start, end):
        self.start = start
        self.end = end

class FakeStreamReader(DataSourceStreamReader):
    def __init__(self, schema, options):
        self.current = 0

    def initialOffset(self) -> dict:
        """
        Returns the initial start offset of the reader.
        """
        return {"offset": 0}

    def latestOffset(self) -> dict:
        """
        Returns the current latest offset that the next microbatch will read to.
        """
        self.current += 2
        return {"offset": self.current}

    def partitions(self, start: dict, end: dict):
        """
        Plans the partitioning of the current microbatch defined by start and end offset. It
        needs to return a sequence of :class:`InputPartition` objects.
        """
        return [RangePartition(start["offset"], end["offset"])]

    def commit(self, end: dict):
        """
        This is invoked when the query has finished processing data before end offset. This
        can be used to clean up the resource.
        """
        pass

    def read(self, partition) -> Iterator[Tuple]:
        """
        Takes a partition as an input and reads an iterator of tuples from the data source.
        """
        start, end = partition.start, partition.end
        for i in range(start, end):
            yield (i, str(i))

SimpleDataSourceStreamReader 实现

SimpleStreamReader 实例与 FakeStreamReader 实例相同,在每个批中生成两行,但它是使用 SimpleDataSourceStreamReader 接口实现的,而无需分区。

class SimpleStreamReader(SimpleDataSourceStreamReader):
    def initialOffset(self):
        """
        Returns the initial start offset of the reader.
        """
        return {"offset": 0}

    def read(self, start: dict) -> (Iterator[Tuple], dict):
        """
        Takes start offset as an input, then returns an iterator of tuples and the start offset of the next read.
        """
        start_idx = start["offset"]
        it = iter([(i,) for i in range(start_idx, start_idx + 2)])
        return (it, {"offset": start_idx + 2})

    def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
        """
        Takes start and end offset as inputs, then reads an iterator of data deterministically.
        This is called when the query replays batches during restart or after a failure.
        """
        start_idx = start["offset"]
        end_idx = end["offset"]
        return iter([(i,) for i in range(start_idx, end_idx)])

    def commit(self, end):
        """
        This is invoked when the query has finished processing data before end offset. This can be used to clean up resources.
        """
        pass

步骤 3:实现流写入器

现在实现流式处理写入器。 此流式处理数据写入器将每个微批的元数据信息写入本地路径。

class SimpleCommitMessage:
   def __init__(self, partition_id: int, count: int):
       self.partition_id = partition_id
       self.count = count

class FakeStreamWriter(DataSourceStreamWriter):
   def __init__(self, options):
       self.options = options
       self.path = self.options.get("path")
       assert self.path is not None

   def write(self, iterator):
       """
       Writes the data, then returns the commit message of that partition. Library imports must be within the method.
       """
       from pyspark import TaskContext
       context = TaskContext.get()
       partition_id = context.partitionId()
       cnt = 0
       for row in iterator:
           cnt += 1
       return SimpleCommitMessage(partition_id=partition_id, count=cnt)

   def commit(self, messages, batchId) -> None:
       """
       Receives a sequence of :class:`WriterCommitMessage` when all write tasks have succeeded, then decides what to do with it.
       In this FakeStreamWriter, the metadata of the microbatch(number of rows and partitions) is written into a JSON file inside commit().
       """
       status = dict(num_partitions=len(messages), rows=sum(m.count for m in messages))
       with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
           file.write(json.dumps(status) + "\n")

   def abort(self, messages, batchId) -> None:
       """
       Receives a sequence of :class:`WriterCommitMessage` from successful tasks when some other tasks have failed, then decides what to do with it.
       In this FakeStreamWriter, a failure message is written into a text file inside abort().
       """
       with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
           file.write(f"failed in batch {batchId}")

步骤 4:注册并使用示例数据源

要使用数据源,请对其进行注册。 注册后,可以通过将短名或全名传递给 format(),在流式处理查询中将其用作源或接收器。 以下示例注册数据源,然后启动从示例数据源读取并输出到控制台的查询:

spark.dataSource.register(FakeStreamDataSource)
query = spark.readStream.format("fakestream").load().writeStream.format("console").start()

以下示例改用示例流作为接收器并指定输出路径:

query = spark.readStream.format("fakestream").load().writeStream.format("fake").start("/output_path")

故障排除

如果输出是以下错误,则表示计算不支持 PySpark 自定义数据源。 必须使用 Databricks Runtime 15.2 或更高版本。

Error: [UNSUPPORTED_FEATURE.PYTHON_DATA_SOURCE] The feature is not supported: Python data sources. SQLSTATE: 0A000