Freigeben über


Benutzerdefinierte PySpark-Datenquellen

Wichtig

Benutzerdefinierte PySpark-Datenquellen befinden sich in der öffentlichen Vorschau in Databricks Runtime 15.2 und höher. Streamingunterstützung ist in der Databricks Runtime 15.3 und höher verfügbar.

Eine PySpark DataSource-Datenquelle wird von der Python (PySpark) DataSource-API erstellt, die das Lesen von benutzerdefinierten Datenquellen und das Schreiben in benutzerdefinierte Datensenken in Apache Spark mithilfe von Python ermöglicht. Mithilfe von benutzerdefinierten PySpark-Datenquellen können Sie benutzerdefinierte Verbindungen mit Datensystemen definieren und zusätzliche Funktionalität implementieren, um wiederverwendbare Datenquellen zu erstellen.

DataSource-Klasse

PySpark DataSource ist eine Basisklasse, die Methoden zum Erstellen von Modulen zum Lesen und Schreiben von Daten bereitstellt.

Implementieren der Datenquellenunterklasse

Abhängig von Ihrem Anwendungsfall muss von jeder Unterklasse Folgendes implementiert werden, damit eine Datenquelle entweder lesbar, schreibbar oder beides ist:

Eigenschaft oder Methode Beschreibung
name Erforderlich. Der Name der Datenquelle.
schema Erforderlich. Das Schema der Datenquelle, die gelesen oder geschrieben werden soll
reader() Muss einen DataSourceReader zurückzugeben, um die Datenquelle lesbar zu machen (Batch)
writer() Muss einen DataSourceWriter zurückzugeben, um die Datensenke schreibbar zu machen (Batch)
streamReader() oder simpleStreamReader() Muss einen DataSourceStreamReader zurückgeben, damit der Datenstrom lesbar ist (Streaming)
streamWriter() Muss einen DataSourceStreamWriter zurückgeben, um den Datenstrom schreibbar zu machen (Streaming)

Hinweis

Die benutzerdefinierten Werte DataSource, DataSourceReader, DataSourceWriter, DataSourceStreamReader, DataSourceStreamWriter und ihre Methoden müssen serialisiert werden können. Mit anderen Worten: Sie müssen ein Wörterbuch oder ein geschachteltes Wörterbuch sein, das einen primitiven Typ enthält.

Registrieren der Datenquelle

Nach der Implementierung der Schnittstelle müssen Sie sie registrieren, und Sie können sie laden oder anderweitig verwenden, wie im folgenden Beispiel gezeigt:

# Register the data source
spark.dataSource.register(MyDataSourceClass)

# Read from a custom data source
spark.read.format("my_datasource_name").load().show()

Beispiel 1: Erstellen einer PySpark DataSource-Datenquelle für eine Batchabfrage

Um PySpark DataSource-Lesefunktionen zu veranschaulichen, erstellen Sie eine Datenquelle, die Beispieldaten mithilfe des Python-Pakets faker generiert. Weitere Informationen zu faker finden Sie in der Faker-Dokumentation.

Installieren Sie das Paket faker mit dem folgenden Befehl:

%pip install faker

Schritt 1: Definieren des DataSource-Beispiels

Definieren Sie zunächst Ihre neue PySpark DataSource-Datenquelle als Unterklasse von DataSource mit einem Namen, Schema und Lesemodul. Die reader()-Methode muss definiert werden, um aus einer Datenquelle in einer Batchabfrage zu lesen.

from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType

class FakeDataSource(DataSource):
    """
    An example data source for batch query using the `faker` library.
    """

    @classmethod
    def name(cls):
        return "fake"

    def schema(self):
        return "name string, date string, zipcode string, state string"

    def reader(self, schema: StructType):
        return FakeDataSourceReader(schema, self.options)

Schritt 2: Implementieren des Lesemoduls für eine Batchabfrage

Implementieren Sie als Nächstes die Leselogik, um Beispieldaten zu generieren. Verwenden Sie die installierte faker-Bibliothek, um jedes Feld im Schema auszufüllen.

class FakeDataSourceReader(DataSourceReader):

    def __init__(self, schema, options):
        self.schema: StructType = schema
        self.options = options

    def read(self, partition):
        # Library imports must be within the method.
        from faker import Faker
        fake = Faker()

        # Every value in this `self.options` dictionary is a string.
        num_rows = int(self.options.get("numRows", 3))
        for _ in range(num_rows):
            row = []
            for field in self.schema.fields:
                value = getattr(fake, field.name)()
                row.append(value)
            yield tuple(row)

Schritt 3: Registrieren und Verwenden der Beispieldatenquelle

Um die Datenquelle zu verwenden, registrieren Sie sie. Standardmäßig weist FakeDataSource drei Zeilen auf, und das Schema enthält die folgenden string-Felder: name, date, zipcode, state. Im folgenden Beispiel wird die Beispieldatenquelle mit den Standardwerten registriert, geladen und ausgegeben:

spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
+-----------------+----------+-------+----------+
|             name|      date|zipcode|     state|
+-----------------+----------+-------+----------+
|Christine Sampson|1979-04-24|  79766|  Colorado|
|       Shelby Cox|2011-08-05|  24596|   Florida|
|  Amanda Robinson|2019-01-06|  57395|Washington|
+-----------------+----------+-------+----------+

Nur string-Felder werden unterstützt, aber Sie können ein Schema mit allen Feldern angeben, die den Feldern des faker-Paketanbieters entsprechen, um zufällige Daten für Tests und Entwicklung zu generieren. Im folgenden Beispiel wird die Datenquelle mit den Feldern name und company geladen:

spark.read.format("fake").schema("name string, company string").load().show()
+---------------------+--------------+
|name                 |company       |
+---------------------+--------------+
|Tanner Brennan       |Adams Group   |
|Leslie Maxwell       |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc   |
+---------------------+--------------+

Wenn Sie die Datenquelle mit einer benutzerdefinierten Anzahl von Zeilen laden möchten, geben Sie die Option numRows an. Im folgenden Beispiel werden 5 Zeilen angegeben:

spark.read.format("fake").option("numRows", 5).load().show()
+--------------+----------+-------+------------+
|          name|      date|zipcode|       state|
+--------------+----------+-------+------------+
|  Pam Mitchell|1988-10-20|  23788|   Tennessee|
|Melissa Turner|1996-06-14|  30851|      Nevada|
|  Brian Ramsey|2021-08-21|  55277|  Washington|
|  Caitlin Reed|1983-06-22|  89813|Pennsylvania|
| Douglas James|2007-01-18|  46226|     Alabama|
+--------------+----------+-------+------------+

Beispiel 2: Erstellen von PySpark DataSource zum Streamen von Lese- und Schreibzugriff

Um PySpark DataSource-Streamreader- und -writerfähigkeiten zu veranschaulichen, erstellen Sie eine Beispiel-Datenquelle, die zwei mithilfe des Python-Pakets faker in jedem Mikrobatch zwei Reihen generiert. Weitere Informationen zu faker finden Sie in der Faker-Dokumentation.

Installieren Sie das Paket faker mit dem folgenden Befehl:

%pip install faker

Schritt 1: Definieren des DataSource-Beispiels

Definieren Sie als Nächstes Ihre neue PySpark DataSource-Datenquelle als Unterklasse von DataSource mit einem Namen, Schema und die Methoden streamReader() und streamWriter().

from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter
from pyspark.sql.types import StructType

class FakeStreamDataSource(DataSource):
    """
    An example data source for streaming read and write using the `faker` library.
    """

    @classmethod
    def name(cls):
        return "fakestream"

    def schema(self):
        return "name string, state string"

    def streamReader(self, schema: StructType):
        return FakeStreamReader(schema, self.options)

    # If you don't need partitioning, you can implement the simpleStreamReader method instead of streamReader.
    # def simpleStreamReader(self, schema: StructType):
    #    return SimpleStreamReader()

    def streamWriter(self, schema: StructType, overwrite: bool):
        return FakeStreamWriter(self.options)

Schritt 2: Implementieren des Streamreaders

Implementieren Sie als Nächstes den Beispiel-Streamingdatenleser, der zwei Zeilen in jedem Mikrobatch generiert. Sie können DataSourceStreamReader implementieren oder wenn die Datenquelle einen niedrigen Durchsatz aufweist und keine Partitionierung erfordert, können Sie stattdessen SimpleDataSourceStreamReader implementieren. Es muss entweder simpleStreamReader() oder streamReader() implementiert werden und simpleStreamReader() wird nur aufgerufen, wenn streamReader() nicht implementiert ist.

Implementierung des DataSourceStreamReader

Die streamReader-Instanz weist einen ganzzahligen Offset auf, der bei jedem Mikrobatch, der mit der DataSourceStreamReader-Schnittstelle implementiert wird, um 2 erhöht wird.

class RangePartition(InputPartition):
    def __init__(self, start, end):
        self.start = start
        self.end = end

class FakeStreamReader(DataSourceStreamReader):
    def __init__(self, schema, options):
        self.current = 0

    def initialOffset(self) -> dict:
        """
        Returns the initial start offset of the reader.
        """
        return {"offset": 0}

    def latestOffset(self) -> dict:
        """
        Returns the current latest offset that the next microbatch will read to.
        """
        self.current += 2
        return {"offset": self.current}

    def partitions(self, start: dict, end: dict):
        """
        Plans the partitioning of the current microbatch defined by start and end offset. It
        needs to return a sequence of :class:`InputPartition` objects.
        """
        return [RangePartition(start["offset"], end["offset"])]

    def commit(self, end: dict):
        """
        This is invoked when the query has finished processing data before end offset. This
        can be used to clean up the resource.
        """
        pass

    def read(self, partition) -> Iterator[Tuple]:
        """
        Takes a partition as an input and reads an iterator of tuples from the data source.
        """
        start, end = partition.start, partition.end
        for i in range(start, end):
            yield (i, str(i))

Implementierung des SimpleDataSourceStreamReader

Die SimpleStreamReader-Instanz ist identisch mit der FakeStreamReader-Instanz, die in jedem Batch zwei Zeilen generiert, aber ohne Partitionierung mit der SimpleDataSourceStreamReader-Schnittstelle implementiert wird.

class SimpleStreamReader(SimpleDataSourceStreamReader):
    def initialOffset(self):
        """
        Returns the initial start offset of the reader.
        """
        return {"offset": 0}

    def read(self, start: dict) -> (Iterator[Tuple], dict):
        """
        Takes start offset as an input, then returns an iterator of tuples and the start offset of the next read.
        """
        start_idx = start["offset"]
        it = iter([(i,) for i in range(start_idx, start_idx + 2)])
        return (it, {"offset": start_idx + 2})

    def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
        """
        Takes start and end offset as inputs, then reads an iterator of data deterministically.
        This is called when the query replays batches during restart or after a failure.
        """
        start_idx = start["offset"]
        end_idx = end["offset"]
        return iter([(i,) for i in range(start_idx, end_idx)])

    def commit(self, end):
        """
        This is invoked when the query has finished processing data before end offset. This can be used to clean up resources.
        """
        pass

Schritt 3: Implementieren des Streamwriters

Implementieren Sie nun den Streamingwriter. Dieser Streamingdaten-Writer schreibt die Metadateninformationen der einzelnen Mikrobatches in einen lokalen Pfad.

class SimpleCommitMessage(WriterCommitMessage):
   partition_id: int
   count: int

class FakeStreamWriter(DataSourceStreamWriter):
   def __init__(self, options):
       self.options = options
       self.path = self.options.get("path")
       assert self.path is not None

   def write(self, iterator):
       """
       Writes the data, then returns the commit message of that partition. Library imports must be within the method.
       """
       from pyspark import TaskContext
       context = TaskContext.get()
       partition_id = context.partitionId()
       cnt = 0
       for row in iterator:
           cnt += 1
       return SimpleCommitMessage(partition_id=partition_id, count=cnt)

   def commit(self, messages, batchId) -> None:
       """
       Receives a sequence of :class:`WriterCommitMessage` when all write tasks have succeeded, then decides what to do with it.
       In this FakeStreamWriter, the metadata of the microbatch(number of rows and partitions) is written into a JSON file inside commit().
       """
       status = dict(num_partitions=len(messages), rows=sum(m.count for m in messages))
       with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
           file.write(json.dumps(status) + "\n")

   def abort(self, messages, batchId) -> None:
       """
       Receives a sequence of :class:`WriterCommitMessage` from successful tasks when some other tasks have failed, then decides what to do with it.
       In this FakeStreamWriter, a failure message is written into a text file inside abort().
       """
       with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
           file.write(f"failed in batch {batchId}")

Schritt 4: Registrieren und Verwenden der Beispieldatenquelle

Um die Datenquelle zu verwenden, registrieren Sie sie. Nachdem er registriert wurde, können Sie ihn in Streamingabfragen als Quelle oder Senke verwenden, indem Sie einen Kurz- oder vollständigen Namen an format() übergeben. Das folgende Beispiel registriert die Datenquelle und startet dann eine Abfrage, die aus der Beispieldatenquelle liest und auf der Konsole ausgibt:

spark.dataSource.register(FakeStreamDataSource)
query = spark.readStream.format("fakestream").load().writeStream.format("console").start()

Alternativ dazu verwendet das folgende Beispiel den Beispielstream als Senke und gibt einen Ausgabepfad an:

query = spark.readStream.format("fakestream").load().writeStream.format("fake").start("/output_path")

Problembehandlung

Wenn der folgende Fehler ausgegeben wird, unterstützen Ihre Computeressourcen keine benutzerdefinierten PySpark-Datenquellen. Sie müssen Databricks Runtime 15.2 oder höher verwenden.

Error: [UNSUPPORTED_FEATURE.PYTHON_DATA_SOURCE] The feature is not supported: Python data sources. SQLSTATE: 0A000