PySpark 自定义数据源
重要
PySpark 自定义数据源在 Databricks Runtime 15.2 及更高版本中以公共预览版提供。 Databricks Runtime 15.3 及更高版本提供流式处理支持。
PySpark DataSource 是由 Python (PySpark) DataSource API 创建的,它允许使用 Python 从自定义数据源读取数据并写入 Apache Spark 中的自定义数据接收器。 可以使用 PySpark 自定义数据源来定义到数据系统的自定义连接,并实现其他功能,以构建可重用的数据源。
DataSource 类
PySpark DataSource 是基类,它提供创建数据读取者和写入者的方法。
实现数据源子类
根据你的用例,任何子类必须实现以下元素,使数据源可读、可写或可读写:
属性或方法 | 说明 |
---|---|
name |
必填。 数据源的名称 |
schema |
必需。 要读取或写入的数据源的架构 |
reader() |
必须返回 DataSourceReader 才能使数据源可读(批处理) |
writer() |
必须返回 DataSourceWriter 才能使数据接收器可写(批处理) |
streamReader() 或 simpleStreamReader() |
必须返回 DataSourceStreamReader 才能使数据流可读(流式处理) |
streamWriter() |
必须返回 DataSourceStreamWriter 才能使数据流可写(流式处理) |
注意
用户定义的 DataSource
、DataSourceReader
、DataSourceWriter
、DataSourceStreamReader
、DataSourceStreamWriter
及其方法必须能够序列化。 换言之,它们必须是包含基元类型的字典或嵌套字典。
注册数据源
实现接口后,必须注册它,然后才能加载或以其他方式使用它,如以下示例所示:
# Register the data source
spark.dataSource.register(MyDataSourceClass)
# Read from a custom data source
spark.read.format("my_datasource_name").load().show()
示例 1:为批处理查询创建 PySpark DataSource
为了演示 PySpark DataSource 读取器功能,请创建数据源,用于通过 faker
Python 包生成示例数据。 有关 faker
的更多信息,请参阅 Faker 文档。
使用以下命令安装 faker
包:
%pip install faker
步骤 1:定义示例 DataSource
首先,将新的 PySpark DataSource 定义为 DataSource
的子类,并带有名称、架构和读取器。 必须将 reader()
方法定义为从批处理查询中的数据源读取。
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType
class FakeDataSource(DataSource):
"""
An example data source for batch query using the `faker` library.
"""
@classmethod
def name(cls):
return "fake"
def schema(self):
return "name string, date string, zipcode string, state string"
def reader(self, schema: StructType):
return FakeDataSourceReader(schema, self.options)
步骤 2:实现批处理查询的读取器
接下来,实现读取者逻辑以生成示例数据。 使用已安装的 faker
库填充架构中的每个字段。
class FakeDataSourceReader(DataSourceReader):
def __init__(self, schema, options):
self.schema: StructType = schema
self.options = options
def read(self, partition):
# Library imports must be within the method.
from faker import Faker
fake = Faker()
# Every value in this `self.options` dictionary is a string.
num_rows = int(self.options.get("numRows", 3))
for _ in range(num_rows):
row = []
for field in self.schema.fields:
value = getattr(fake, field.name)()
row.append(value)
yield tuple(row)
步骤 3:注册并使用示例数据源
要使用数据源,请对其进行注册。 默认情况下,FakeDataSource
有三行,架构包括以下 string
字段:name
、date
、zipcode
、state
。 以下示例使用默认值注册、加载和输出示例数据源:
spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
+-----------------+----------+-------+----------+
| name| date|zipcode| state|
+-----------------+----------+-------+----------+
|Christine Sampson|1979-04-24| 79766| Colorado|
| Shelby Cox|2011-08-05| 24596| Florida|
| Amanda Robinson|2019-01-06| 57395|Washington|
+-----------------+----------+-------+----------+
只支持 string
字段,但可以指定包含与 faker
包提供程序字段对应的任何字段的架构,以生成用于测试和开发的随机数据。 以下示例加载具有 name
和 company
字段的数据源:
spark.read.format("fake").schema("name string, company string").load().show()
+---------------------+--------------+
|name |company |
+---------------------+--------------+
|Tanner Brennan |Adams Group |
|Leslie Maxwell |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc |
+---------------------+--------------+
若要加载具有自定义行数的数据源,请指定 numRows
选项。 以下示例指定了 5 行:
spark.read.format("fake").option("numRows", 5).load().show()
+--------------+----------+-------+------------+
| name| date|zipcode| state|
+--------------+----------+-------+------------+
| Pam Mitchell|1988-10-20| 23788| Tennessee|
|Melissa Turner|1996-06-14| 30851| Nevada|
| Brian Ramsey|2021-08-21| 55277| Washington|
| Caitlin Reed|1983-06-22| 89813|Pennsylvania|
| Douglas James|2007-01-18| 46226| Alabama|
+--------------+----------+-------+------------+
示例 2:为流式读取和写入创建 PySpark DataSource
为了演示 PySpark DataSource 流读取器和写入器功能,请创建一个示例数据源,用于通过 faker
Python 包在每个微批中生成两行。 有关 faker
的更多信息,请参阅 Faker 文档。
使用以下命令安装 faker
包:
%pip install faker
步骤 1:定义示例 DataSource
首先,将新的 PySpark DataSource 定义为 DataSource
的子类,并带有名称、架构以及方法 streamReader()
和 streamWriter()
。
from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter
from pyspark.sql.types import StructType
class FakeStreamDataSource(DataSource):
"""
An example data source for streaming read and write using the `faker` library.
"""
@classmethod
def name(cls):
return "fakestream"
def schema(self):
return "name string, state string"
def streamReader(self, schema: StructType):
return FakeStreamReader(schema, self.options)
# If you don't need partitioning, you can implement the simpleStreamReader method instead of streamReader.
# def simpleStreamReader(self, schema: StructType):
# return SimpleStreamReader()
def streamWriter(self, schema: StructType, overwrite: bool):
return FakeStreamWriter(self.options)
步骤 2:实现流读取器
接下来,实现在每个微批中生成两行的示例流数据读取器。 可以实现 DataSourceStreamReader
,或者,如果数据源吞吐量较低且不需要分区,则你可以实现 SimpleDataSourceStreamReader
。 必须实现 simpleStreamReader()
或 streamReader()
,并且仅当未实现 streamReader()
时才会调用 simpleStreamReader()
。
DataSourceStreamReader 实现
streamReader
实例具有一个整数偏移量,它在每个微批中递增 2,并通过 DataSourceStreamReader
接口实现。
class RangePartition(InputPartition):
def __init__(self, start, end):
self.start = start
self.end = end
class FakeStreamReader(DataSourceStreamReader):
def __init__(self, schema, options):
self.current = 0
def initialOffset(self) -> dict:
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}
def latestOffset(self) -> dict:
"""
Returns the current latest offset that the next microbatch will read to.
"""
self.current += 2
return {"offset": self.current}
def partitions(self, start: dict, end: dict):
"""
Plans the partitioning of the current microbatch defined by start and end offset. It
needs to return a sequence of :class:`InputPartition` objects.
"""
return [RangePartition(start["offset"], end["offset"])]
def commit(self, end: dict):
"""
This is invoked when the query has finished processing data before end offset. This
can be used to clean up the resource.
"""
pass
def read(self, partition) -> Iterator[Tuple]:
"""
Takes a partition as an input and reads an iterator of tuples from the data source.
"""
start, end = partition.start, partition.end
for i in range(start, end):
yield (i, str(i))
SimpleDataSourceStreamReader 实现
SimpleStreamReader
实例与 FakeStreamReader
实例相同,在每个批中生成两行,但它是使用 SimpleDataSourceStreamReader
接口实现的,而无需分区。
class SimpleStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}
def read(self, start: dict) -> (Iterator[Tuple], dict):
"""
Takes start offset as an input, then returns an iterator of tuples and the start offset of the next read.
"""
start_idx = start["offset"]
it = iter([(i,) for i in range(start_idx, start_idx + 2)])
return (it, {"offset": start_idx + 2})
def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
"""
Takes start and end offset as inputs, then reads an iterator of data deterministically.
This is called when the query replays batches during restart or after a failure.
"""
start_idx = start["offset"]
end_idx = end["offset"]
return iter([(i,) for i in range(start_idx, end_idx)])
def commit(self, end):
"""
This is invoked when the query has finished processing data before end offset. This can be used to clean up resources.
"""
pass
步骤 3:实现流写入器
现在实现流式处理写入器。 此流式处理数据写入器将每个微批的元数据信息写入本地路径。
class SimpleCommitMessage(WriterCommitMessage):
partition_id: int
count: int
class FakeStreamWriter(DataSourceStreamWriter):
def __init__(self, options):
self.options = options
self.path = self.options.get("path")
assert self.path is not None
def write(self, iterator):
"""
Writes the data, then returns the commit message of that partition. Library imports must be within the method.
"""
from pyspark import TaskContext
context = TaskContext.get()
partition_id = context.partitionId()
cnt = 0
for row in iterator:
cnt += 1
return SimpleCommitMessage(partition_id=partition_id, count=cnt)
def commit(self, messages, batchId) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` when all write tasks have succeeded, then decides what to do with it.
In this FakeStreamWriter, the metadata of the microbatch(number of rows and partitions) is written into a JSON file inside commit().
"""
status = dict(num_partitions=len(messages), rows=sum(m.count for m in messages))
with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
file.write(json.dumps(status) + "\n")
def abort(self, messages, batchId) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` from successful tasks when some other tasks have failed, then decides what to do with it.
In this FakeStreamWriter, a failure message is written into a text file inside abort().
"""
with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
file.write(f"failed in batch {batchId}")
步骤 4:注册并使用示例数据源
要使用数据源,请对其进行注册。 注册后,可以通过将短名或全名传递给 format()
,在流式处理查询中将其用作源或接收器。 以下示例注册数据源,然后启动从示例数据源读取并输出到控制台的查询:
spark.dataSource.register(FakeStreamDataSource)
query = spark.readStream.format("fakestream").load().writeStream.format("console").start()
以下示例改用示例流作为接收器并指定输出路径:
query = spark.readStream.format("fakestream").load().writeStream.format("fake").start("/output_path")
故障排除
如果输出是以下错误,则表示计算不支持 PySpark 自定义数据源。 必须使用 Databricks Runtime 15.2 或更高版本。
Error: [UNSUPPORTED_FEATURE.PYTHON_DATA_SOURCE] The feature is not supported: Python data sources. SQLSTATE: 0A000