次の方法で共有


データソース

データ ソースの基本クラス。

このクラスは、読み取りや書き込みを可能にするカスタム データ ソースを表します。 データ ソースには、データを読み書きするためのリーダーとライターをそれぞれ作成するメソッドが用意されています。 データ ソースを読み取り可能または書き込み可能にするために、少なくとも 1 つのメソッド reader() または writer() を任意のサブクラスで実装する必要があります。

このインターフェイスを実装した後、 spark.read.format(...).load() を使用してデータ ソースを読み込み、 df.write.format(...).save()を使用してデータを保存できます。

構文

from pyspark.sql.datasource import DataSource

class MyDataSource(DataSource):
    @classmethod
    def name(cls):
        return "my_data_source"

パラメーター

パラメーター タイプ 説明
options 辞書 このデータ ソースのオプションを表す、大文字と小文字を区別しないディクショナリ。

メソッド

メソッド 説明
name() このデータ ソースの形式名を表す文字列を返します。 既定では、クラス名を返します。 カスタマイズされた短い名前を指定する場合はオーバーライドします。
schema() データ ソースのスキーマを StructType または DDL 文字列として返します。 実装されておらず、ユーザーによってスキーマが指定されていない場合は、例外がスローされます。
reader(schema) データを読み取るための DataSourceReader インスタンスを返します。 読み取り可能なデータ ソースに必要です。
writer(schema, overwrite) データを書き込むための DataSourceWriter インスタンスを返します。 書き込み可能なデータ ソースに必要です。
streamWriter(schema, overwrite) ストリーミング シンクにデータを書き込むための DataSourceStreamWriter インスタンスを返します。 書き込み可能なストリーミング データ ソースに必要です。
simpleStreamReader(schema) ストリーミング データを読み取るための SimpleDataSourceStreamReader インスタンスを返します。 streamReader()が実装されていない場合にのみ使用されます。
streamReader(schema) ストリーミング データを読み取るための DataSourceStreamReader インスタンスを返します。 simpleStreamReader()よりも優先されます。

例示

カスタムの読み取り可能なデータ ソースを定義して登録します。

from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition

class MyDataSource(DataSource):
    @classmethod
    def name(cls):
        return "my_data_source"

    def schema(self):
        return "a INT, b STRING"

    def reader(self, schema):
        return MyDataSourceReader(schema)

class MyDataSourceReader(DataSourceReader):
    def read(self, partition):
        yield (1, "hello")
        yield (2, "world")

spark.dataSource.register(MyDataSource)
df = spark.read.format("my_data_source").load()
df.show()

StructType スキーマを使用してデータ ソースを定義します。

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

class MyDataSource(DataSource):
    def schema(self):
        return StructType().add("a", "int").add("b", "string")