Aracılığıyla paylaş


PySpark özel veri kaynakları

Önemli

PySpark özel veri kaynakları Databricks Runtime 15.2 ve üzerinde Genel Önizleme aşamasındadır. Akış desteği Databricks Runtime 15.3 ve üzerinde kullanılabilir.

PySpark DataSource, Python (PySpark) DataSource API'si tarafından oluşturulur. Bu API, Python kullanarak Apache Spark'ta özel veri kaynaklarından okuma ve özel veri havuzlarına yazma işlemlerine olanak tanır. PySpark özel veri kaynaklarını kullanarak veri sistemlerine özel bağlantılar tanımlayabilir ve yeniden kullanılabilir veri kaynakları oluşturmak için ek işlevler uygulayabilirsiniz.

DataSource sınıfı

PySpark DataSource , veri okuyucuları ve yazıcılar oluşturmak için yöntemler sağlayan bir temel sınıftır.

Veri kaynağı alt sınıfını uygulama

Kullanım örneğine bağlı olarak, veri kaynağının okunabilir, yazılabilir veya her ikisini birden yapabilmesi için aşağıdakilerin herhangi bir alt sınıf tarafından uygulanması gerekir:

Özellik veya Yöntem Açıklama
name Gerekli. Veri kaynağının adı
schema Gerekli. Okunacak veya yazılacak veri kaynağının şeması
reader() Veri kaynağını okunabilir hale getirmek için bir DataSourceReader döndürmelidir (toplu iş)
writer() Veri havuzu yazılabilir hale getirmek için bir DataSourceWriter döndürmelidir (toplu iş)
streamReader() veya simpleStreamReader() Veri akışını okunabilir hale getirmek için bir DataSourceStreamReader döndürmelidir (akış)
streamWriter() Veri akışını yazılabilir hale getirmek için bir DataSourceStreamWriter döndürmelidir (akış)

Not

Kullanıcı tanımlı DataSource, DataSourceReader, DataSourceWriter, DataSourceStreamReader, , DataSourceStreamWriterve yöntemlerinin seri hale getirilebilmesi gerekir. Başka bir deyişle, ilkel bir tür içeren bir sözlük veya iç içe yerleştirilmiş sözlük olmalıdır.

Veri kaynağını kaydetme

Arabirimi uyguladıktan sonra kaydetmeniz gerekir, ardından aşağıdaki örnekte gösterildiği gibi yükleyebilir veya başka bir şekilde kullanabilirsiniz:

# Register the data source
spark.dataSource.register(MyDataSourceClass)

# Read from a custom data source
spark.read.format("my_datasource_name").load().show()

Örnek 1: Toplu sorgu için PySpark DataSource oluşturma

PySpark DataSource okuyucu özelliklerini göstermek için Python paketini kullanarak faker örnek veriler oluşturan bir veri kaynağı oluşturun. hakkında fakerdaha fazla bilgi için Faker belgelerine bakın.

faker Aşağıdaki komutu kullanarak paketi yükleyin:

%pip install faker

1. Adım: DataSource örneğini tanımlama

İlk olarak, yeni PySpark DataSource'unuzu bir ad, şema ve okuyucu ile alt sınıfı DataSource olarak tanımlayın. Bir toplu iş sorgusundaki reader() veri kaynağından okumak için yöntemi tanımlanmalıdır.

from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType

class FakeDataSource(DataSource):
    """
    An example data source for batch query using the `faker` library.
    """

    @classmethod
    def name(cls):
        return "fake"

    def schema(self):
        return "name string, date string, zipcode string, state string"

    def reader(self, schema: StructType):
        return FakeDataSourceReader(schema, self.options)

2. Adım: Toplu sorgu için okuyucuyu uygulama

Ardından, örnek veriler oluşturmak için okuyucu mantığını uygulayın. Şemadaki her alanı doldurmak için yüklü faker kitaplığı kullanın.

class FakeDataSourceReader(DataSourceReader):

    def __init__(self, schema, options):
        self.schema: StructType = schema
        self.options = options

    def read(self, partition):
        # Library imports must be within the method.
        from faker import Faker
        fake = Faker()

        # Every value in this `self.options` dictionary is a string.
        num_rows = int(self.options.get("numRows", 3))
        for _ in range(num_rows):
            row = []
            for field in self.schema.fields:
                value = getattr(fake, field.name)()
                row.append(value)
            yield tuple(row)

3. Adım: Örnek veri kaynağını kaydetme ve kullanma

Veri kaynağını kullanmak için kaydedin. Varsayılan olarak, öğesinin FakeDataSource üç satırı vardır ve şema şu string alanları içerir: name, date, zipcode, state. Aşağıdaki örnek, örnek veri kaynağını varsayılan değerlerle kaydeder, yükler ve çıkışını yapar:

spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
+-----------------+----------+-------+----------+
|             name|      date|zipcode|     state|
+-----------------+----------+-------+----------+
|Christine Sampson|1979-04-24|  79766|  Colorado|
|       Shelby Cox|2011-08-05|  24596|   Florida|
|  Amanda Robinson|2019-01-06|  57395|Washington|
+-----------------+----------+-------+----------+

Yalnızca string alanlar desteklenir, ancak test ve geliştirme için rastgele veri oluşturmak üzere faker paket sağlayıcılarının alanlarına karşılık gelen alanlarla bir şema belirtebilirsiniz. Aşağıdaki örnek, veri kaynağını ve company alanlarıyla name yükler:

spark.read.format("fake").schema("name string, company string").load().show()
+---------------------+--------------+
|name                 |company       |
+---------------------+--------------+
|Tanner Brennan       |Adams Group   |
|Leslie Maxwell       |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc   |
+---------------------+--------------+

Veri kaynağını özel sayıda satırla yüklemek için seçeneğini belirtin numRows . Aşağıdaki örnek 5 satır belirtir:

spark.read.format("fake").option("numRows", 5).load().show()
+--------------+----------+-------+------------+
|          name|      date|zipcode|       state|
+--------------+----------+-------+------------+
|  Pam Mitchell|1988-10-20|  23788|   Tennessee|
|Melissa Turner|1996-06-14|  30851|      Nevada|
|  Brian Ramsey|2021-08-21|  55277|  Washington|
|  Caitlin Reed|1983-06-22|  89813|Pennsylvania|
| Douglas James|2007-01-18|  46226|     Alabama|
+--------------+----------+-------+------------+

Örnek 2: Okuma ve yazma akışı için PySpark DataSource oluşturma

PySpark DataSource akış okuyucu ve yazıcı özelliklerini göstermek için Python paketini kullanarak faker her mikrobatch'te iki satır oluşturan örnek bir veri kaynağı oluşturun. hakkında fakerdaha fazla bilgi için Faker belgelerine bakın.

faker Aşağıdaki komutu kullanarak paketi yükleyin:

%pip install faker

1. Adım: DataSource örneğini tanımlama

İlk olarak, yeni PySpark DataSource'unuzu adı, şeması ve yöntemleri streamReader()streamWriter()ve ile alt sınıfı DataSource olarak tanımlayın.

from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter
from pyspark.sql.types import StructType

class FakeStreamDataSource(DataSource):
    """
    An example data source for streaming read and write using the `faker` library.
    """

    @classmethod
    def name(cls):
        return "fakestream"

    def schema(self):
        return "name string, state string"

    def streamReader(self, schema: StructType):
        return FakeStreamReader(schema, self.options)

    # If you don't need partitioning, you can implement the simpleStreamReader method instead of streamReader.
    # def simpleStreamReader(self, schema: StructType):
    #    return SimpleStreamReader()

    def streamWriter(self, schema: StructType, overwrite: bool):
        return FakeStreamWriter(self.options)

2. Adım: Akış okuyucuyu uygulama

Ardından, her mikrobatch içinde iki satır oluşturan örnek akış veri okuyucusu uygulayın. uygulayabilirsiniz DataSourceStreamReaderveya veri kaynağının aktarım hızı düşükse ve bölümleme gerekmiyorsa bunu uygulayabilirsiniz SimpleDataSourceStreamReader . veya simpleStreamReader()streamReader() uygulanmalıdır ve simpleStreamReader() yalnızca uygulanmadığında streamReader() çağrılır.

DataSourceStreamReader uygulaması

Örneğin streamReader , arabirimiyle DataSourceStreamReader uygulanan her mikrobatchte 2 artan bir tamsayı uzaklığı vardır.

class RangePartition(InputPartition):
    def __init__(self, start, end):
        self.start = start
        self.end = end

class FakeStreamReader(DataSourceStreamReader):
    def __init__(self, schema, options):
        self.current = 0

    def initialOffset(self) -> dict:
        """
        Returns the initial start offset of the reader.
        """
        return {"offset": 0}

    def latestOffset(self) -> dict:
        """
        Returns the current latest offset that the next microbatch will read to.
        """
        self.current += 2
        return {"offset": self.current}

    def partitions(self, start: dict, end: dict):
        """
        Plans the partitioning of the current microbatch defined by start and end offset. It
        needs to return a sequence of :class:`InputPartition` objects.
        """
        return [RangePartition(start["offset"], end["offset"])]

    def commit(self, end: dict):
        """
        This is invoked when the query has finished processing data before end offset. This
        can be used to clean up the resource.
        """
        pass

    def read(self, partition) -> Iterator[Tuple]:
        """
        Takes a partition as an input and reads an iterator of tuples from the data source.
        """
        start, end = partition.start, partition.end
        for i in range(start, end):
            yield (i, str(i))

SimpleDataSourceStreamReader uygulaması

ÖrnekSimpleStreamReader, her toplu işlemde iki satır oluşturan ancak bölümleme olmadan arabirimle uygulanan örnekle SimpleDataSourceStreamReader aynıdırFakeStreamReader.

class SimpleStreamReader(SimpleDataSourceStreamReader):
    def initialOffset(self):
        """
        Returns the initial start offset of the reader.
        """
        return {"offset": 0}

    def read(self, start: dict) -> (Iterator[Tuple], dict):
        """
        Takes start offset as an input, then returns an iterator of tuples and the start offset of the next read.
        """
        start_idx = start["offset"]
        it = iter([(i,) for i in range(start_idx, start_idx + 2)])
        return (it, {"offset": start_idx + 2})

    def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
        """
        Takes start and end offset as inputs, then reads an iterator of data deterministically.
        This is called when the query replays batches during restart or after a failure.
        """
        start_idx = start["offset"]
        end_idx = end["offset"]
        return iter([(i,) for i in range(start_idx, end_idx)])

    def commit(self, end):
        """
        This is invoked when the query has finished processing data before end offset. This can be used to clean up resources.
        """
        pass

3. Adım: Akış yazıcısını uygulama

Şimdi akış yazıcısını uygulayın. Bu akış veri yazıcısı, her mikrobatch'in meta veri bilgilerini yerel bir yola yazar.

class SimpleCommitMessage(WriterCommitMessage):
   partition_id: int
   count: int

class FakeStreamWriter(DataSourceStreamWriter):
   def __init__(self, options):
       self.options = options
       self.path = self.options.get("path")
       assert self.path is not None

   def write(self, iterator):
       """
       Writes the data, then returns the commit message of that partition. Library imports must be within the method.
       """
       from pyspark import TaskContext
       context = TaskContext.get()
       partition_id = context.partitionId()
       cnt = 0
       for row in iterator:
           cnt += 1
       return SimpleCommitMessage(partition_id=partition_id, count=cnt)

   def commit(self, messages, batchId) -> None:
       """
       Receives a sequence of :class:`WriterCommitMessage` when all write tasks have succeeded, then decides what to do with it.
       In this FakeStreamWriter, the metadata of the microbatch(number of rows and partitions) is written into a JSON file inside commit().
       """
       status = dict(num_partitions=len(messages), rows=sum(m.count for m in messages))
       with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
           file.write(json.dumps(status) + "\n")

   def abort(self, messages, batchId) -> None:
       """
       Receives a sequence of :class:`WriterCommitMessage` from successful tasks when some other tasks have failed, then decides what to do with it.
       In this FakeStreamWriter, a failure message is written into a text file inside abort().
       """
       with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
           file.write(f"failed in batch {batchId}")

4. Adım: Örnek veri kaynağını kaydetme ve kullanma

Veri kaynağını kullanmak için kaydedin. Kayıt defterine alındıktan sonra, akış sorgularında kısa bir ad veya tam ad format()geçirerek kaynak veya havuz olarak kullanabilirsiniz. Aşağıdaki örnek, veri kaynağını kaydeder, ardından örnek veri kaynağından okuyan ve konsola çıkış veren bir sorgu başlatır:

spark.dataSource.register(FakeStreamDataSource)
query = spark.readStream.format("fakestream").load().writeStream.format("console").start()

Alternatif olarak, aşağıdaki örnek havuz olarak örnek akışı kullanır ve bir çıkış yolu belirtir:

query = spark.readStream.format("fakestream").load().writeStream.format("fake").start("/output_path")

Sorun giderme

Çıkış aşağıdaki hataysa, işleminiz PySpark özel veri kaynaklarını desteklemez. Databricks Runtime 15.2 veya üzerini kullanmanız gerekir.

Error: [UNSUPPORTED_FEATURE.PYTHON_DATA_SOURCE] The feature is not supported: Python data sources. SQLSTATE: 0A000