Пользовательские источники данных PySpark
Внимание
Пользовательские источники данных PySpark находятся в общедоступной предварительной версии в Databricks Runtime 15.2 и выше. Поддержка потоковой передачи доступна в Databricks Runtime 15.3 и выше.
PySpark DataSource создается API-интерфейсом DataSource Python (PySpark), который позволяет читать из пользовательских источников данных и записывать их в пользовательские приемники данных в Apache Spark с помощью Python. Пользовательские источники данных PySpark можно использовать для определения пользовательских подключений к системам данных и реализации дополнительных функций для создания повторно используемых источников данных.
Класс DataSource
PySpark DataSource — это базовый класс, который предоставляет методы для создания средств чтения и записи данных.
Реализация подкласса источника данных
В зависимости от варианта использования любой подкласс должен реализовываться любым подклассом, чтобы сделать источник данных доступным для чтения, записи или обоих:
Свойство или метод | Description |
---|---|
name |
Обязательное. Имя источника данных |
schema |
Обязательный. Схема источника данных для чтения или записи |
reader() |
Должен вернуться, DataSourceReader чтобы источник данных был доступен для чтения (пакет) |
writer() |
Должен вернуться для DataSourceWriter записи приемника данных (пакет) |
streamReader() или simpleStreamReader() |
Необходимо вернуть DataSourceStreamReader значение, чтобы сделать поток данных доступным для чтения (потоковая передача) |
streamWriter() |
Должен возвращать DataSourceStreamWriter объект для записи потока данных (потоковая передача) |
Примечание.
Определяемые DataSource
пользователем , , DataSourceReader
, DataSourceStreamWriter
DataSourceWriter
DataSourceStreamReader
и их методы должны быть сериализованы. Другими словами, они должны быть словарем или вложенным словарем, содержащим примитивный тип.
Регистрация источника данных
После реализации интерфейса необходимо зарегистрировать его, затем можно загрузить или использовать его, как показано в следующем примере:
# Register the data source
spark.dataSource.register(MyDataSourceClass)
# Read from a custom data source
spark.read.format("my_datasource_name").load().show()
Пример 1. Создание источника данных PySpark для пакетного запроса
Чтобы продемонстрировать возможности чтения PySpark DataSource, создайте источник данных, который создает примеры данных с помощью faker
пакета Python. Дополнительные сведения см faker
. в документации по Faker.
Установите пакет с помощью следующей faker
команды:
%pip install faker
Шаг 1. Определение примера DataSource
Сначала определите новый Источник данных PySpark в качестве подкласса DataSource
с именем, схемой и читателем. Метод reader()
должен быть определен для чтения из источника данных в пакетном запросе.
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType
class FakeDataSource(DataSource):
"""
An example data source for batch query using the `faker` library.
"""
@classmethod
def name(cls):
return "fake"
def schema(self):
return "name string, date string, zipcode string, state string"
def reader(self, schema: StructType):
return FakeDataSourceReader(schema, self.options)
Шаг 2. Реализация средства чтения для пакетного запроса
Затем реализуйте логику чтения для создания примеров данных. Используйте установленную faker
библиотеку для заполнения каждого поля в схеме.
class FakeDataSourceReader(DataSourceReader):
def __init__(self, schema, options):
self.schema: StructType = schema
self.options = options
def read(self, partition):
# Library imports must be within the method.
from faker import Faker
fake = Faker()
# Every value in this `self.options` dictionary is a string.
num_rows = int(self.options.get("numRows", 3))
for _ in range(num_rows):
row = []
for field in self.schema.fields:
value = getattr(fake, field.name)()
row.append(value)
yield tuple(row)
Шаг 3. Регистрация и использование примера источника данных
Чтобы использовать источник данных, зарегистрируйте его. По умолчанию имеет FakeDataSource
три строки, а схема включает следующие string
поля: name
, date
, , zipcode
state
. В следующем примере регистрируются, загружаются и выводится пример источника данных по умолчанию:
spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
+-----------------+----------+-------+----------+
| name| date|zipcode| state|
+-----------------+----------+-------+----------+
|Christine Sampson|1979-04-24| 79766| Colorado|
| Shelby Cox|2011-08-05| 24596| Florida|
| Amanda Robinson|2019-01-06| 57395|Washington|
+-----------------+----------+-------+----------+
Поддерживаются только string
поля, но можно указать схему с любыми полями, соответствующими faker
полям поставщиков пакетов для создания случайных данных для тестирования и разработки. В следующем примере загружается источник данных с полямиname
.company
spark.read.format("fake").schema("name string, company string").load().show()
+---------------------+--------------+
|name |company |
+---------------------+--------------+
|Tanner Brennan |Adams Group |
|Leslie Maxwell |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc |
+---------------------+--------------+
Чтобы загрузить источник данных с пользовательским числом строк, укажите numRows
этот параметр. В следующем примере указано 5 строк:
spark.read.format("fake").option("numRows", 5).load().show()
+--------------+----------+-------+------------+
| name| date|zipcode| state|
+--------------+----------+-------+------------+
| Pam Mitchell|1988-10-20| 23788| Tennessee|
|Melissa Turner|1996-06-14| 30851| Nevada|
| Brian Ramsey|2021-08-21| 55277| Washington|
| Caitlin Reed|1983-06-22| 89813|Pennsylvania|
| Douglas James|2007-01-18| 46226| Alabama|
+--------------+----------+-------+------------+
Пример 2. Создание PySpark DataSource для потоковой передачи чтения и записи
Чтобы продемонстрировать возможности средства чтения потоков и записи PySpark DataSource, создайте пример источника данных, который создает две строки в каждом микробатче faker
с помощью пакета Python. Дополнительные сведения см faker
. в документации по Faker.
Установите пакет с помощью следующей faker
команды:
%pip install faker
Шаг 1. Определение примера DataSource
Сначала определите новый PySpark DataSource как подкласс DataSource
с именем, схемой и методамиstreamReader()
.streamWriter()
from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter
from pyspark.sql.types import StructType
class FakeStreamDataSource(DataSource):
"""
An example data source for streaming read and write using the `faker` library.
"""
@classmethod
def name(cls):
return "fakestream"
def schema(self):
return "name string, state string"
def streamReader(self, schema: StructType):
return FakeStreamReader(schema, self.options)
# If you don't need partitioning, you can implement the simpleStreamReader method instead of streamReader.
# def simpleStreamReader(self, schema: StructType):
# return SimpleStreamReader()
def streamWriter(self, schema: StructType, overwrite: bool):
return FakeStreamWriter(self.options)
Шаг 2. Реализация средства чтения потоков
Затем реализуйте пример средства чтения потоковых данных, который создает две строки в каждом микробатче. Можно реализовать DataSourceStreamReader
или, если источник данных имеет низкую пропускную способность и не требует секционирования, можно реализовать SimpleDataSourceStreamReader
вместо этого. streamReader()
Либо simpleStreamReader()
или должны быть реализованы, и simpleStreamReader()
вызывается только в том случае, если streamReader()
он не реализован.
Реализация DataSourceStreamReader
Экземпляр streamReader
имеет целочисленное смещение, которое увеличивается на 2 в каждом микробатче, реализованном DataSourceStreamReader
с помощью интерфейса.
class RangePartition(InputPartition):
def __init__(self, start, end):
self.start = start
self.end = end
class FakeStreamReader(DataSourceStreamReader):
def __init__(self, schema, options):
self.current = 0
def initialOffset(self) -> dict:
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}
def latestOffset(self) -> dict:
"""
Returns the current latest offset that the next microbatch will read to.
"""
self.current += 2
return {"offset": self.current}
def partitions(self, start: dict, end: dict):
"""
Plans the partitioning of the current microbatch defined by start and end offset. It
needs to return a sequence of :class:`InputPartition` objects.
"""
return [RangePartition(start["offset"], end["offset"])]
def commit(self, end: dict):
"""
This is invoked when the query has finished processing data before end offset. This
can be used to clean up the resource.
"""
pass
def read(self, partition) -> Iterator[Tuple]:
"""
Takes a partition as an input and reads an iterator of tuples from the data source.
"""
start, end = partition.start, partition.end
for i in range(start, end):
yield (i, str(i))
Реализация SimpleDataSourceStreamReader
Экземпляр SimpleStreamReader
совпадает с экземпляром FakeStreamReader
, который создает две строки в каждом пакете, но реализуется с интерфейсом SimpleDataSourceStreamReader
без секционирования.
class SimpleStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}
def read(self, start: dict) -> (Iterator[Tuple], dict):
"""
Takes start offset as an input, then returns an iterator of tuples and the start offset of the next read.
"""
start_idx = start["offset"]
it = iter([(i,) for i in range(start_idx, start_idx + 2)])
return (it, {"offset": start_idx + 2})
def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
"""
Takes start and end offset as inputs, then reads an iterator of data deterministically.
This is called when the query replays batches during restart or after a failure.
"""
start_idx = start["offset"]
end_idx = end["offset"]
return iter([(i,) for i in range(start_idx, end_idx)])
def commit(self, end):
"""
This is invoked when the query has finished processing data before end offset. This can be used to clean up resources.
"""
pass
Шаг 3. Реализация модуля записи потоков
Теперь реализуйте модуль записи потоковой передачи. Этот модуль записи потоковых данных записывает сведения метаданных каждого микробатча в локальный путь.
class SimpleCommitMessage(WriterCommitMessage):
partition_id: int
count: int
class FakeStreamWriter(DataSourceStreamWriter):
def __init__(self, options):
self.options = options
self.path = self.options.get("path")
assert self.path is not None
def write(self, iterator):
"""
Writes the data, then returns the commit message of that partition. Library imports must be within the method.
"""
from pyspark import TaskContext
context = TaskContext.get()
partition_id = context.partitionId()
cnt = 0
for row in iterator:
cnt += 1
return SimpleCommitMessage(partition_id=partition_id, count=cnt)
def commit(self, messages, batchId) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` when all write tasks have succeeded, then decides what to do with it.
In this FakeStreamWriter, the metadata of the microbatch(number of rows and partitions) is written into a JSON file inside commit().
"""
status = dict(num_partitions=len(messages), rows=sum(m.count for m in messages))
with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
file.write(json.dumps(status) + "\n")
def abort(self, messages, batchId) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` from successful tasks when some other tasks have failed, then decides what to do with it.
In this FakeStreamWriter, a failure message is written into a text file inside abort().
"""
with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
file.write(f"failed in batch {batchId}")
Шаг 4. Регистрация и использование примера источника данных
Чтобы использовать источник данных, зарегистрируйте его. После регрессии его можно использовать в потоковых запросах в качестве источника или приемника, передав короткое имя или полное имя format()
. В следующем примере регистрируется источник данных, а затем запускается запрос, который считывается из примера источника данных и выходных данных в консоль:
spark.dataSource.register(FakeStreamDataSource)
query = spark.readStream.format("fakestream").load().writeStream.format("console").start()
Кроме того, в следующем примере в качестве приемника используется пример потока и указывается выходной путь:
query = spark.readStream.format("fakestream").load().writeStream.format("fake").start("/output_path")
Устранение неполадок
Если выходные данные являются следующей ошибкой, вычислительные ресурсы не поддерживают пользовательские источники данных PySpark. Необходимо использовать Databricks Runtime 15.2 или более поздней версии.
Error: [UNSUPPORTED_FEATURE.PYTHON_DATA_SOURCE] The feature is not supported: Python data sources. SQLSTATE: 0A000