Поделиться через


Пользовательские источники данных PySpark

Внимание

Пользовательские источники данных PySpark находятся в общедоступной предварительной версии в Databricks Runtime 15.2 и выше. Поддержка потоковой передачи доступна в Databricks Runtime 15.3 и выше.

PySpark DataSource создается API-интерфейсом DataSource Python (PySpark), который позволяет читать из пользовательских источников данных и записывать их в пользовательские приемники данных в Apache Spark с помощью Python. Пользовательские источники данных PySpark можно использовать для определения пользовательских подключений к системам данных и реализации дополнительных функций для создания повторно используемых источников данных.

Класс DataSource

PySpark DataSource — это базовый класс, который предоставляет методы для создания средств чтения и записи данных.

Реализация подкласса источника данных

В зависимости от варианта использования любой подкласс должен реализовываться любым подклассом, чтобы сделать источник данных доступным для чтения, записи или обоих:

Свойство или метод Description
name Обязательное. Имя источника данных
schema Обязательный. Схема источника данных для чтения или записи
reader() Должен вернуться, DataSourceReader чтобы источник данных был доступен для чтения (пакет)
writer() Должен вернуться для DataSourceWriter записи приемника данных (пакет)
streamReader() или simpleStreamReader() Необходимо вернуть DataSourceStreamReader значение, чтобы сделать поток данных доступным для чтения (потоковая передача)
streamWriter() Должен возвращать DataSourceStreamWriter объект для записи потока данных (потоковая передача)

Примечание.

Определяемые DataSourceпользователем , , DataSourceReader, DataSourceStreamWriterDataSourceWriterDataSourceStreamReaderи их методы должны быть сериализованы. Другими словами, они должны быть словарем или вложенным словарем, содержащим примитивный тип.

Регистрация источника данных

После реализации интерфейса необходимо зарегистрировать его, затем можно загрузить или использовать его, как показано в следующем примере:

# Register the data source
spark.dataSource.register(MyDataSourceClass)

# Read from a custom data source
spark.read.format("my_datasource_name").load().show()

Пример 1. Создание источника данных PySpark для пакетного запроса

Чтобы продемонстрировать возможности чтения PySpark DataSource, создайте источник данных, который создает примеры данных с помощью faker пакета Python. Дополнительные сведения см faker. в документации по Faker.

Установите пакет с помощью следующей faker команды:

%pip install faker

Шаг 1. Определение примера DataSource

Сначала определите новый Источник данных PySpark в качестве подкласса DataSource с именем, схемой и читателем. Метод reader() должен быть определен для чтения из источника данных в пакетном запросе.

from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType

class FakeDataSource(DataSource):
    """
    An example data source for batch query using the `faker` library.
    """

    @classmethod
    def name(cls):
        return "fake"

    def schema(self):
        return "name string, date string, zipcode string, state string"

    def reader(self, schema: StructType):
        return FakeDataSourceReader(schema, self.options)

Шаг 2. Реализация средства чтения для пакетного запроса

Затем реализуйте логику чтения для создания примеров данных. Используйте установленную faker библиотеку для заполнения каждого поля в схеме.

class FakeDataSourceReader(DataSourceReader):

    def __init__(self, schema, options):
        self.schema: StructType = schema
        self.options = options

    def read(self, partition):
        # Library imports must be within the method.
        from faker import Faker
        fake = Faker()

        # Every value in this `self.options` dictionary is a string.
        num_rows = int(self.options.get("numRows", 3))
        for _ in range(num_rows):
            row = []
            for field in self.schema.fields:
                value = getattr(fake, field.name)()
                row.append(value)
            yield tuple(row)

Шаг 3. Регистрация и использование примера источника данных

Чтобы использовать источник данных, зарегистрируйте его. По умолчанию имеет FakeDataSource три строки, а схема включает следующие string поля: name, date, , zipcodestate. В следующем примере регистрируются, загружаются и выводится пример источника данных по умолчанию:

spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
+-----------------+----------+-------+----------+
|             name|      date|zipcode|     state|
+-----------------+----------+-------+----------+
|Christine Sampson|1979-04-24|  79766|  Colorado|
|       Shelby Cox|2011-08-05|  24596|   Florida|
|  Amanda Robinson|2019-01-06|  57395|Washington|
+-----------------+----------+-------+----------+

Поддерживаются только string поля, но можно указать схему с любыми полями, соответствующими faker полям поставщиков пакетов для создания случайных данных для тестирования и разработки. В следующем примере загружается источник данных с полямиname.company

spark.read.format("fake").schema("name string, company string").load().show()
+---------------------+--------------+
|name                 |company       |
+---------------------+--------------+
|Tanner Brennan       |Adams Group   |
|Leslie Maxwell       |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc   |
+---------------------+--------------+

Чтобы загрузить источник данных с пользовательским числом строк, укажите numRows этот параметр. В следующем примере указано 5 строк:

spark.read.format("fake").option("numRows", 5).load().show()
+--------------+----------+-------+------------+
|          name|      date|zipcode|       state|
+--------------+----------+-------+------------+
|  Pam Mitchell|1988-10-20|  23788|   Tennessee|
|Melissa Turner|1996-06-14|  30851|      Nevada|
|  Brian Ramsey|2021-08-21|  55277|  Washington|
|  Caitlin Reed|1983-06-22|  89813|Pennsylvania|
| Douglas James|2007-01-18|  46226|     Alabama|
+--------------+----------+-------+------------+

Пример 2. Создание PySpark DataSource для потоковой передачи чтения и записи

Чтобы продемонстрировать возможности средства чтения потоков и записи PySpark DataSource, создайте пример источника данных, который создает две строки в каждом микробатче faker с помощью пакета Python. Дополнительные сведения см faker. в документации по Faker.

Установите пакет с помощью следующей faker команды:

%pip install faker

Шаг 1. Определение примера DataSource

Сначала определите новый PySpark DataSource как подкласс DataSource с именем, схемой и методамиstreamReader().streamWriter()

from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter
from pyspark.sql.types import StructType

class FakeStreamDataSource(DataSource):
    """
    An example data source for streaming read and write using the `faker` library.
    """

    @classmethod
    def name(cls):
        return "fakestream"

    def schema(self):
        return "name string, state string"

    def streamReader(self, schema: StructType):
        return FakeStreamReader(schema, self.options)

    # If you don't need partitioning, you can implement the simpleStreamReader method instead of streamReader.
    # def simpleStreamReader(self, schema: StructType):
    #    return SimpleStreamReader()

    def streamWriter(self, schema: StructType, overwrite: bool):
        return FakeStreamWriter(self.options)

Шаг 2. Реализация средства чтения потоков

Затем реализуйте пример средства чтения потоковых данных, который создает две строки в каждом микробатче. Можно реализовать DataSourceStreamReaderили, если источник данных имеет низкую пропускную способность и не требует секционирования, можно реализовать SimpleDataSourceStreamReader вместо этого. streamReader() Либо simpleStreamReader() или должны быть реализованы, и simpleStreamReader() вызывается только в том случае, если streamReader() он не реализован.

Реализация DataSourceStreamReader

Экземпляр streamReader имеет целочисленное смещение, которое увеличивается на 2 в каждом микробатче, реализованном DataSourceStreamReader с помощью интерфейса.

class RangePartition(InputPartition):
    def __init__(self, start, end):
        self.start = start
        self.end = end

class FakeStreamReader(DataSourceStreamReader):
    def __init__(self, schema, options):
        self.current = 0

    def initialOffset(self) -> dict:
        """
        Returns the initial start offset of the reader.
        """
        return {"offset": 0}

    def latestOffset(self) -> dict:
        """
        Returns the current latest offset that the next microbatch will read to.
        """
        self.current += 2
        return {"offset": self.current}

    def partitions(self, start: dict, end: dict):
        """
        Plans the partitioning of the current microbatch defined by start and end offset. It
        needs to return a sequence of :class:`InputPartition` objects.
        """
        return [RangePartition(start["offset"], end["offset"])]

    def commit(self, end: dict):
        """
        This is invoked when the query has finished processing data before end offset. This
        can be used to clean up the resource.
        """
        pass

    def read(self, partition) -> Iterator[Tuple]:
        """
        Takes a partition as an input and reads an iterator of tuples from the data source.
        """
        start, end = partition.start, partition.end
        for i in range(start, end):
            yield (i, str(i))

Реализация SimpleDataSourceStreamReader

Экземпляр SimpleStreamReader совпадает с экземпляром FakeStreamReader , который создает две строки в каждом пакете, но реализуется с интерфейсом SimpleDataSourceStreamReader без секционирования.

class SimpleStreamReader(SimpleDataSourceStreamReader):
    def initialOffset(self):
        """
        Returns the initial start offset of the reader.
        """
        return {"offset": 0}

    def read(self, start: dict) -> (Iterator[Tuple], dict):
        """
        Takes start offset as an input, then returns an iterator of tuples and the start offset of the next read.
        """
        start_idx = start["offset"]
        it = iter([(i,) for i in range(start_idx, start_idx + 2)])
        return (it, {"offset": start_idx + 2})

    def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
        """
        Takes start and end offset as inputs, then reads an iterator of data deterministically.
        This is called when the query replays batches during restart or after a failure.
        """
        start_idx = start["offset"]
        end_idx = end["offset"]
        return iter([(i,) for i in range(start_idx, end_idx)])

    def commit(self, end):
        """
        This is invoked when the query has finished processing data before end offset. This can be used to clean up resources.
        """
        pass

Шаг 3. Реализация модуля записи потоков

Теперь реализуйте модуль записи потоковой передачи. Этот модуль записи потоковых данных записывает сведения метаданных каждого микробатча в локальный путь.

class SimpleCommitMessage(WriterCommitMessage):
   partition_id: int
   count: int

class FakeStreamWriter(DataSourceStreamWriter):
   def __init__(self, options):
       self.options = options
       self.path = self.options.get("path")
       assert self.path is not None

   def write(self, iterator):
       """
       Writes the data, then returns the commit message of that partition. Library imports must be within the method.
       """
       from pyspark import TaskContext
       context = TaskContext.get()
       partition_id = context.partitionId()
       cnt = 0
       for row in iterator:
           cnt += 1
       return SimpleCommitMessage(partition_id=partition_id, count=cnt)

   def commit(self, messages, batchId) -> None:
       """
       Receives a sequence of :class:`WriterCommitMessage` when all write tasks have succeeded, then decides what to do with it.
       In this FakeStreamWriter, the metadata of the microbatch(number of rows and partitions) is written into a JSON file inside commit().
       """
       status = dict(num_partitions=len(messages), rows=sum(m.count for m in messages))
       with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
           file.write(json.dumps(status) + "\n")

   def abort(self, messages, batchId) -> None:
       """
       Receives a sequence of :class:`WriterCommitMessage` from successful tasks when some other tasks have failed, then decides what to do with it.
       In this FakeStreamWriter, a failure message is written into a text file inside abort().
       """
       with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
           file.write(f"failed in batch {batchId}")

Шаг 4. Регистрация и использование примера источника данных

Чтобы использовать источник данных, зарегистрируйте его. После регрессии его можно использовать в потоковых запросах в качестве источника или приемника, передав короткое имя или полное имя format(). В следующем примере регистрируется источник данных, а затем запускается запрос, который считывается из примера источника данных и выходных данных в консоль:

spark.dataSource.register(FakeStreamDataSource)
query = spark.readStream.format("fakestream").load().writeStream.format("console").start()

Кроме того, в следующем примере в качестве приемника используется пример потока и указывается выходной путь:

query = spark.readStream.format("fakestream").load().writeStream.format("fake").start("/output_path")

Устранение неполадок

Если выходные данные являются следующей ошибкой, вычислительные ресурсы не поддерживают пользовательские источники данных PySpark. Необходимо использовать Databricks Runtime 15.2 или более поздней версии.

Error: [UNSUPPORTED_FEATURE.PYTHON_DATA_SOURCE] The feature is not supported: Python data sources. SQLSTATE: 0A000