次の方法で共有


PySpark カスタム データ ソース

重要

PySpark カスタム データ ソースは、Databricks Runtime 15.2 以降のパブリック プレビュー段階にあります。 ストリーミングは、Databricks Runtime 15.3 以降でサポートされています。

PySpark DataSource は Python (PySpark) DataSource API によって作成され、これによって、Python を使用した Apache Spark 内のカスタム データ ソースからの読み取りとカスタム データ シンクへの書き込みが可能になります。 PySpark カスタム データ ソースを使用すると、データ システムへのカスタム接続を定義し、追加の機能を実装して、再利用可能なデータ ソースを構築できます。

DataSource クラス

PySpark DataSource は、データ リーダーとライターを作成するメソッドを提供する基底クラスです。

データ ソース サブクラスを実装する

ユース ケースに応じて、データ ソースを読み取り可能か書き込み可能、あるいはその両方にするためには、何らかのサブクラスで実装する必要があります。

プロパティまたはメソッド 説明
name 必須。 データ ソースの名前
schema 必須。 読み取りまたは書き込み対象のデータ ソースのスキーマ
reader() データ ソースを読み取り可能にする DataSourceReader を返す必要があります (バッチ)
writer() データ シンクを書き込み可能にする DataSourceWriter を返す必要があります (バッチ)
streamReader() または simpleStreamReader() データ ストリームを読み取り可能にする DataSourceStreamReader を返す必要があります (ストリーミング)
streamWriter() データ ストリームを書き込み可能にする DataSourceStreamWriter を返す必要があります (ストリーミング)

Note

ユーザー定義の DataSourceDataSourceReaderDataSourceWriterDataSourceStreamReaderDataSourceStreamWriter、およびそのメソッドをシリアル化できる必要があります。 つまり、プリミティブ型を含むディクショナリまたは入れ子になったディクショナリである必要があります。

データ ソースを登録する

インターフェイスを実装したら、それを登録した後、読み込むか、次の例に示すように使用できます。

# Register the data source
spark.dataSource.register(MyDataSourceClass)

# Read from a custom data source
spark.read.format("my_datasource_name").load().show()

例 1: バッチ クエリ用の PySpark DataSource を作成する

PySpark DataSource のリーダー機能のデモを行うためにfaker Python パッケージを使用して、サンプル データを生成するデータ ソースを作成します。 faker に関する詳細については、「Faker ドキュメント」を参照してください。

次のコマンドを使用して、faker パッケージをインストールします。

%pip install faker

手順 1: サンプル DataSource を定義する

最初に、新しい PySpark DataSource を名前、スキーマ、リーダーを持つ DataSource のサブクラスとして定義します。 バッチ クエリでデータ ソースからの読み取りを行うには reader() メソッドを定義する必要があります。

from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType

class FakeDataSource(DataSource):
    """
    An example data source for batch query using the `faker` library.
    """

    @classmethod
    def name(cls):
        return "fake"

    def schema(self):
        return "name string, date string, zipcode string, state string"

    def reader(self, schema: StructType):
        return FakeDataSourceReader(schema, self.options)

手順 2: バッチ クエリのリーダーを実装する

次に、リーダー ロジックを実装してサンプル データを生成します。 インストールされている faker ライブラリを使用して、スキーマ内の各フィールドを設定します。

class FakeDataSourceReader(DataSourceReader):

    def __init__(self, schema, options):
        self.schema: StructType = schema
        self.options = options

    def read(self, partition):
        # Library imports must be within the method.
        from faker import Faker
        fake = Faker()

        # Every value in this `self.options` dictionary is a string.
        num_rows = int(self.options.get("numRows", 3))
        for _ in range(num_rows):
            row = []
            for field in self.schema.fields:
                value = getattr(fake, field.name)()
                row.append(value)
            yield tuple(row)

手順 3: サンプル データ ソースを登録して使用する

このデータ ソースを使用するために、その登録を行います。 既定では FakeDataSource は 3 つの行を持ち、スキーマには次の string フィールドが含まれます: namedatezipcodestate。 次の例では、既定値を使用してサンプル データ ソースの登録、読み込み、出力を行います。

spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
+-----------------+----------+-------+----------+
|             name|      date|zipcode|     state|
+-----------------+----------+-------+----------+
|Christine Sampson|1979-04-24|  79766|  Colorado|
|       Shelby Cox|2011-08-05|  24596|   Florida|
|  Amanda Robinson|2019-01-06|  57395|Washington|
+-----------------+----------+-------+----------+

サポートされているのは string フィールドだけですが、faker パッケージ プロバイダーのフィールドに対応する任意のフィールドを持つスキーマを指定して、テストと開発用のランダム データを生成できます。 次の例では、name および company フィールドを持つデータ ソースを読み込みます。

spark.read.format("fake").schema("name string, company string").load().show()
+---------------------+--------------+
|name                 |company       |
+---------------------+--------------+
|Tanner Brennan       |Adams Group   |
|Leslie Maxwell       |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc   |
+---------------------+--------------+

カスタムの行数でデータ ソースを読み込むには、numRows オプションを指定します。 次の例では、5 個の行を指定します。

spark.read.format("fake").option("numRows", 5).load().show()
+--------------+----------+-------+------------+
|          name|      date|zipcode|       state|
+--------------+----------+-------+------------+
|  Pam Mitchell|1988-10-20|  23788|   Tennessee|
|Melissa Turner|1996-06-14|  30851|      Nevada|
|  Brian Ramsey|2021-08-21|  55277|  Washington|
|  Caitlin Reed|1983-06-22|  89813|Pennsylvania|
| Douglas James|2007-01-18|  46226|     Alabama|
+--------------+----------+-------+------------+

例 2: ストリーミング読み取りおよび書き込み用の PySpark DataSource を作成する

PySpark DataSource ストリーム リーダーおよびライター機能のデモを行うために、faker Python パッケージを使用してすべてのマイクロバッチに 2 行を生成するサンプル データ ソースを作成します。 faker に関する詳細については、「Faker ドキュメント」を参照してください。

次のコマンドを使用して、faker パッケージをインストールします。

%pip install faker

手順 1: サンプル DataSource を定義する

最初に、新しい PySpark DataSource を名前、スキーマ、メソッド streamReader() および streamWriter() を持つ DataSource のサブクラスとして定義します。

from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter
from pyspark.sql.types import StructType

class FakeStreamDataSource(DataSource):
    """
    An example data source for streaming read and write using the `faker` library.
    """

    @classmethod
    def name(cls):
        return "fakestream"

    def schema(self):
        return "name string, state string"

    def streamReader(self, schema: StructType):
        return FakeStreamReader(schema, self.options)

    # If you don't need partitioning, you can implement the simpleStreamReader method instead of streamReader.
    # def simpleStreamReader(self, schema: StructType):
    #    return SimpleStreamReader()

    def streamWriter(self, schema: StructType, overwrite: bool):
        return FakeStreamWriter(self.options)

手順 2: ストリーム リーダーを実装する

次に、すべてのマイクロバッチに 2 行を生成するサンプル ストリーミング データ リーダーを実装します。 DataSourceStreamReader を実装できます。または、データ ソースのスループットが低く、パーティション分割を必要としない場合は、代わりに SimpleDataSourceStreamReader を実装できます。 simpleStreamReader() または streamReader() を実装する必要があり、simpleStreamReader()streamReader() が実装されていないときにのみ呼び出されます。

DataSourceStreamReader の実装

streamReader インスタンスには、DataSourceStreamReader インターフェイスで実装される、すべてのマイクロバッチで 2 ずつ増える整数オフセットがあります。

class RangePartition(InputPartition):
    def __init__(self, start, end):
        self.start = start
        self.end = end

class FakeStreamReader(DataSourceStreamReader):
    def __init__(self, schema, options):
        self.current = 0

    def initialOffset(self) -> dict:
        """
        Returns the initial start offset of the reader.
        """
        return {"offset": 0}

    def latestOffset(self) -> dict:
        """
        Returns the current latest offset that the next microbatch will read to.
        """
        self.current += 2
        return {"offset": self.current}

    def partitions(self, start: dict, end: dict):
        """
        Plans the partitioning of the current microbatch defined by start and end offset. It
        needs to return a sequence of :class:`InputPartition` objects.
        """
        return [RangePartition(start["offset"], end["offset"])]

    def commit(self, end: dict):
        """
        This is invoked when the query has finished processing data before end offset. This
        can be used to clean up the resource.
        """
        pass

    def read(self, partition) -> Iterator[Tuple]:
        """
        Takes a partition as an input and reads an iterator of tuples from the data source.
        """
        start, end = partition.start, partition.end
        for i in range(start, end):
            yield (i, str(i))

SimpleDataSourceStreamReader の実装

SimpleStreamReader インスタンスは、すべてのバッチに 2 行を生成する FakeStreamReader インスタンスと同じですが、パーティション分割なしで SimpleDataSourceStreamReader インターフェイスで実装されます。

class SimpleStreamReader(SimpleDataSourceStreamReader):
    def initialOffset(self):
        """
        Returns the initial start offset of the reader.
        """
        return {"offset": 0}

    def read(self, start: dict) -> (Iterator[Tuple], dict):
        """
        Takes start offset as an input, then returns an iterator of tuples and the start offset of the next read.
        """
        start_idx = start["offset"]
        it = iter([(i,) for i in range(start_idx, start_idx + 2)])
        return (it, {"offset": start_idx + 2})

    def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
        """
        Takes start and end offset as inputs, then reads an iterator of data deterministically.
        This is called when the query replays batches during restart or after a failure.
        """
        start_idx = start["offset"]
        end_idx = end["offset"]
        return iter([(i,) for i in range(start_idx, end_idx)])

    def commit(self, end):
        """
        This is invoked when the query has finished processing data before end offset. This can be used to clean up resources.
        """
        pass

手順 3: ストリーム ライターを実装する

次に、ストリーミング ライターを実装します。 このストリーミング データ ライターは、各マイクロバッチのメタデータ情報をローカル パスに書き込みます。

class SimpleCommitMessage(WriterCommitMessage):
   partition_id: int
   count: int

class FakeStreamWriter(DataSourceStreamWriter):
   def __init__(self, options):
       self.options = options
       self.path = self.options.get("path")
       assert self.path is not None

   def write(self, iterator):
       """
       Writes the data, then returns the commit message of that partition. Library imports must be within the method.
       """
       from pyspark import TaskContext
       context = TaskContext.get()
       partition_id = context.partitionId()
       cnt = 0
       for row in iterator:
           cnt += 1
       return SimpleCommitMessage(partition_id=partition_id, count=cnt)

   def commit(self, messages, batchId) -> None:
       """
       Receives a sequence of :class:`WriterCommitMessage` when all write tasks have succeeded, then decides what to do with it.
       In this FakeStreamWriter, the metadata of the microbatch(number of rows and partitions) is written into a JSON file inside commit().
       """
       status = dict(num_partitions=len(messages), rows=sum(m.count for m in messages))
       with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
           file.write(json.dumps(status) + "\n")

   def abort(self, messages, batchId) -> None:
       """
       Receives a sequence of :class:`WriterCommitMessage` from successful tasks when some other tasks have failed, then decides what to do with it.
       In this FakeStreamWriter, a failure message is written into a text file inside abort().
       """
       with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
           file.write(f"failed in batch {batchId}")

手順 4: サンプル データ ソースを登録して使用する

このデータ ソースを使用するために、その登録を行います。 登録されたら、短い名前またはフル ネームを format() に渡して、ストリーミング クエリでソースまたはシンクとして使用できます。 次の例では、データ ソースを登録し、サンプル データ ソースから読み取ってコンソールに出力するクエリを開始します。

spark.dataSource.register(FakeStreamDataSource)
query = spark.readStream.format("fakestream").load().writeStream.format("console").start()

または、次の例ではサンプル ストリームをシンクとして使用し、出力パスを指定します。

query = spark.readStream.format("fakestream").load().writeStream.format("fake").start("/output_path")

トラブルシューティング

出力が次のようなエラーとなる場合、お使いのコンピューティングでは PySpark カスタム データ ソースがサポートされていません。 Databricks Runtime 15.2 以上を使用する必要があります。

Error: [UNSUPPORTED_FEATURE.PYTHON_DATA_SOURCE] The feature is not supported: Python data sources. SQLSTATE: 0A000