通过


数据源

数据源的基类。

此类表示一个自定义数据源,该数据源允许从中读取和/或写入数据。 数据源提供分别用于创建读取器和写入数据的方法。 至少一个方法 reader()writer() 必须由任何子类实现,以使数据源可读或可写(或两者)

实现此接口后,可以使用 加载数据源 spark.read.format(...).load() 并使用 df.write.format(...).save()保存数据。

Syntax

from pyspark.sql.datasource import DataSource

class MyDataSource(DataSource):
    @classmethod
    def name(cls):
        return "my_data_source"

参数

参数 类型 说明
options dict 表示此数据源的选项的不区分大小写的字典。

方法

方法 说明
name() 返回一个字符串,表示此数据源的格式名称。 默认情况下,返回类名。 重写以提供自定义的短名称。
schema() 以 DDL 字符串形式 StructType 返回数据源的架构。 如果未实现,并且用户未提供任何架构,则会引发异常。
reader(schema) 返回用于 DataSourceReader 读取数据的实例。 可读数据源是必需的。
writer(schema, overwrite) 返回用于 DataSourceWriter 写入数据的实例。 可写数据源是必需的。
streamWriter(schema, overwrite) 返回将数据 DataSourceStreamWriter 写入流式处理接收器的实例。 可写流式处理数据源是必需的。
simpleStreamReader(schema) 返回用于 SimpleDataSourceStreamReader 读取流数据的实例。 仅在未实现时才 streamReader() 使用。
streamReader(schema) 返回用于 DataSourceStreamReader 读取流数据的实例。 优先于 simpleStreamReader()

示例

定义并注册自定义可读数据源:

from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition

class MyDataSource(DataSource):
    @classmethod
    def name(cls):
        return "my_data_source"

    def schema(self):
        return "a INT, b STRING"

    def reader(self, schema):
        return MyDataSourceReader(schema)

class MyDataSourceReader(DataSourceReader):
    def read(self, partition):
        yield (1, "hello")
        yield (2, "world")

spark.dataSource.register(MyDataSource)
df = spark.read.format("my_data_source").load()
df.show()

使用 StructType 架构定义数据源:

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

class MyDataSource(DataSource):
    def schema(self):
        return StructType().add("a", "int").add("b", "string")