PySpark özel veri kaynakları
Önemli
PySpark özel veri kaynakları Databricks Runtime 15.2 ve üzerinde Genel Önizleme aşamasındadır. Akış desteği Databricks Runtime 15.3 ve üzerinde kullanılabilir.
PySpark DataSource, Python (PySpark) DataSource API'si tarafından oluşturulur. Bu API, Python kullanarak Apache Spark'ta özel veri kaynaklarından okuma ve özel veri havuzlarına yazma işlemlerine olanak tanır. PySpark özel veri kaynaklarını kullanarak veri sistemlerine özel bağlantılar tanımlayabilir ve yeniden kullanılabilir veri kaynakları oluşturmak için ek işlevler uygulayabilirsiniz.
DataSource sınıfı
PySpark DataSource , veri okuyucuları ve yazıcılar oluşturmak için yöntemler sağlayan bir temel sınıftır.
Veri kaynağı alt sınıfını uygulama
Kullanım örneğine bağlı olarak, veri kaynağının okunabilir, yazılabilir veya her ikisini birden yapabilmesi için aşağıdakilerin herhangi bir alt sınıf tarafından uygulanması gerekir:
Özellik veya Yöntem | Açıklama |
---|---|
name |
Gerekli. Veri kaynağının adı |
schema |
Gerekli. Okunacak veya yazılacak veri kaynağının şeması |
reader() |
Veri kaynağını okunabilir hale getirmek için bir DataSourceReader döndürmelidir (toplu iş) |
writer() |
Veri havuzu yazılabilir hale getirmek için bir DataSourceWriter döndürmelidir (toplu iş) |
streamReader() veya simpleStreamReader() |
Veri akışını okunabilir hale getirmek için bir DataSourceStreamReader döndürmelidir (akış) |
streamWriter() |
Veri akışını yazılabilir hale getirmek için bir DataSourceStreamWriter döndürmelidir (akış) |
Not
Kullanıcı tanımlı DataSource
, DataSourceReader
, DataSourceWriter
, DataSourceStreamReader
, , DataSourceStreamWriter
ve yöntemlerinin seri hale getirilebilmesi gerekir. Başka bir deyişle, ilkel bir tür içeren bir sözlük veya iç içe yerleştirilmiş sözlük olmalıdır.
Veri kaynağını kaydetme
Arabirimi uyguladıktan sonra kaydetmeniz gerekir, ardından aşağıdaki örnekte gösterildiği gibi yükleyebilir veya başka bir şekilde kullanabilirsiniz:
# Register the data source
spark.dataSource.register(MyDataSourceClass)
# Read from a custom data source
spark.read.format("my_datasource_name").load().show()
Örnek 1: Toplu sorgu için PySpark DataSource oluşturma
PySpark DataSource okuyucu özelliklerini göstermek için Python paketini kullanarak faker
örnek veriler oluşturan bir veri kaynağı oluşturun. hakkında faker
daha fazla bilgi için Faker belgelerine bakın.
faker
Aşağıdaki komutu kullanarak paketi yükleyin:
%pip install faker
1. Adım: DataSource örneğini tanımlama
İlk olarak, yeni PySpark DataSource'unuzu bir ad, şema ve okuyucu ile alt sınıfı DataSource
olarak tanımlayın. Bir toplu iş sorgusundaki reader()
veri kaynağından okumak için yöntemi tanımlanmalıdır.
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType
class FakeDataSource(DataSource):
"""
An example data source for batch query using the `faker` library.
"""
@classmethod
def name(cls):
return "fake"
def schema(self):
return "name string, date string, zipcode string, state string"
def reader(self, schema: StructType):
return FakeDataSourceReader(schema, self.options)
2. Adım: Toplu sorgu için okuyucuyu uygulama
Ardından, örnek veriler oluşturmak için okuyucu mantığını uygulayın. Şemadaki her alanı doldurmak için yüklü faker
kitaplığı kullanın.
class FakeDataSourceReader(DataSourceReader):
def __init__(self, schema, options):
self.schema: StructType = schema
self.options = options
def read(self, partition):
# Library imports must be within the method.
from faker import Faker
fake = Faker()
# Every value in this `self.options` dictionary is a string.
num_rows = int(self.options.get("numRows", 3))
for _ in range(num_rows):
row = []
for field in self.schema.fields:
value = getattr(fake, field.name)()
row.append(value)
yield tuple(row)
3. Adım: Örnek veri kaynağını kaydetme ve kullanma
Veri kaynağını kullanmak için kaydedin. Varsayılan olarak, öğesinin FakeDataSource
üç satırı vardır ve şema şu string
alanları içerir: name
, date
, zipcode
, state
. Aşağıdaki örnek, örnek veri kaynağını varsayılan değerlerle kaydeder, yükler ve çıkışını yapar:
spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
+-----------------+----------+-------+----------+
| name| date|zipcode| state|
+-----------------+----------+-------+----------+
|Christine Sampson|1979-04-24| 79766| Colorado|
| Shelby Cox|2011-08-05| 24596| Florida|
| Amanda Robinson|2019-01-06| 57395|Washington|
+-----------------+----------+-------+----------+
Yalnızca string
alanlar desteklenir, ancak test ve geliştirme için rastgele veri oluşturmak üzere faker
paket sağlayıcılarının alanlarına karşılık gelen alanlarla bir şema belirtebilirsiniz. Aşağıdaki örnek, veri kaynağını ve company
alanlarıyla name
yükler:
spark.read.format("fake").schema("name string, company string").load().show()
+---------------------+--------------+
|name |company |
+---------------------+--------------+
|Tanner Brennan |Adams Group |
|Leslie Maxwell |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc |
+---------------------+--------------+
Veri kaynağını özel sayıda satırla yüklemek için seçeneğini belirtin numRows
. Aşağıdaki örnek 5 satır belirtir:
spark.read.format("fake").option("numRows", 5).load().show()
+--------------+----------+-------+------------+
| name| date|zipcode| state|
+--------------+----------+-------+------------+
| Pam Mitchell|1988-10-20| 23788| Tennessee|
|Melissa Turner|1996-06-14| 30851| Nevada|
| Brian Ramsey|2021-08-21| 55277| Washington|
| Caitlin Reed|1983-06-22| 89813|Pennsylvania|
| Douglas James|2007-01-18| 46226| Alabama|
+--------------+----------+-------+------------+
Örnek 2: Okuma ve yazma akışı için PySpark DataSource oluşturma
PySpark DataSource akış okuyucu ve yazıcı özelliklerini göstermek için Python paketini kullanarak faker
her mikrobatch'te iki satır oluşturan örnek bir veri kaynağı oluşturun. hakkında faker
daha fazla bilgi için Faker belgelerine bakın.
faker
Aşağıdaki komutu kullanarak paketi yükleyin:
%pip install faker
1. Adım: DataSource örneğini tanımlama
İlk olarak, yeni PySpark DataSource'unuzu adı, şeması ve yöntemleri streamReader()
streamWriter()
ve ile alt sınıfı DataSource
olarak tanımlayın.
from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter
from pyspark.sql.types import StructType
class FakeStreamDataSource(DataSource):
"""
An example data source for streaming read and write using the `faker` library.
"""
@classmethod
def name(cls):
return "fakestream"
def schema(self):
return "name string, state string"
def streamReader(self, schema: StructType):
return FakeStreamReader(schema, self.options)
# If you don't need partitioning, you can implement the simpleStreamReader method instead of streamReader.
# def simpleStreamReader(self, schema: StructType):
# return SimpleStreamReader()
def streamWriter(self, schema: StructType, overwrite: bool):
return FakeStreamWriter(self.options)
2. Adım: Akış okuyucuyu uygulama
Ardından, her mikrobatch içinde iki satır oluşturan örnek akış veri okuyucusu uygulayın. uygulayabilirsiniz DataSourceStreamReader
veya veri kaynağının aktarım hızı düşükse ve bölümleme gerekmiyorsa bunu uygulayabilirsiniz SimpleDataSourceStreamReader
. veya simpleStreamReader()
streamReader()
uygulanmalıdır ve simpleStreamReader()
yalnızca uygulanmadığında streamReader()
çağrılır.
DataSourceStreamReader uygulaması
Örneğin streamReader
, arabirimiyle DataSourceStreamReader
uygulanan her mikrobatchte 2 artan bir tamsayı uzaklığı vardır.
class RangePartition(InputPartition):
def __init__(self, start, end):
self.start = start
self.end = end
class FakeStreamReader(DataSourceStreamReader):
def __init__(self, schema, options):
self.current = 0
def initialOffset(self) -> dict:
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}
def latestOffset(self) -> dict:
"""
Returns the current latest offset that the next microbatch will read to.
"""
self.current += 2
return {"offset": self.current}
def partitions(self, start: dict, end: dict):
"""
Plans the partitioning of the current microbatch defined by start and end offset. It
needs to return a sequence of :class:`InputPartition` objects.
"""
return [RangePartition(start["offset"], end["offset"])]
def commit(self, end: dict):
"""
This is invoked when the query has finished processing data before end offset. This
can be used to clean up the resource.
"""
pass
def read(self, partition) -> Iterator[Tuple]:
"""
Takes a partition as an input and reads an iterator of tuples from the data source.
"""
start, end = partition.start, partition.end
for i in range(start, end):
yield (i, str(i))
SimpleDataSourceStreamReader uygulaması
ÖrnekSimpleStreamReader
, her toplu işlemde iki satır oluşturan ancak bölümleme olmadan arabirimle uygulanan örnekle SimpleDataSourceStreamReader
aynıdırFakeStreamReader
.
class SimpleStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}
def read(self, start: dict) -> (Iterator[Tuple], dict):
"""
Takes start offset as an input, then returns an iterator of tuples and the start offset of the next read.
"""
start_idx = start["offset"]
it = iter([(i,) for i in range(start_idx, start_idx + 2)])
return (it, {"offset": start_idx + 2})
def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
"""
Takes start and end offset as inputs, then reads an iterator of data deterministically.
This is called when the query replays batches during restart or after a failure.
"""
start_idx = start["offset"]
end_idx = end["offset"]
return iter([(i,) for i in range(start_idx, end_idx)])
def commit(self, end):
"""
This is invoked when the query has finished processing data before end offset. This can be used to clean up resources.
"""
pass
3. Adım: Akış yazıcısını uygulama
Şimdi akış yazıcısını uygulayın. Bu akış veri yazıcısı, her mikrobatch'in meta veri bilgilerini yerel bir yola yazar.
class SimpleCommitMessage(WriterCommitMessage):
partition_id: int
count: int
class FakeStreamWriter(DataSourceStreamWriter):
def __init__(self, options):
self.options = options
self.path = self.options.get("path")
assert self.path is not None
def write(self, iterator):
"""
Writes the data, then returns the commit message of that partition. Library imports must be within the method.
"""
from pyspark import TaskContext
context = TaskContext.get()
partition_id = context.partitionId()
cnt = 0
for row in iterator:
cnt += 1
return SimpleCommitMessage(partition_id=partition_id, count=cnt)
def commit(self, messages, batchId) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` when all write tasks have succeeded, then decides what to do with it.
In this FakeStreamWriter, the metadata of the microbatch(number of rows and partitions) is written into a JSON file inside commit().
"""
status = dict(num_partitions=len(messages), rows=sum(m.count for m in messages))
with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
file.write(json.dumps(status) + "\n")
def abort(self, messages, batchId) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` from successful tasks when some other tasks have failed, then decides what to do with it.
In this FakeStreamWriter, a failure message is written into a text file inside abort().
"""
with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
file.write(f"failed in batch {batchId}")
4. Adım: Örnek veri kaynağını kaydetme ve kullanma
Veri kaynağını kullanmak için kaydedin. Kayıt defterine alındıktan sonra, akış sorgularında kısa bir ad veya tam ad format()
geçirerek kaynak veya havuz olarak kullanabilirsiniz. Aşağıdaki örnek, veri kaynağını kaydeder, ardından örnek veri kaynağından okuyan ve konsola çıkış veren bir sorgu başlatır:
spark.dataSource.register(FakeStreamDataSource)
query = spark.readStream.format("fakestream").load().writeStream.format("console").start()
Alternatif olarak, aşağıdaki örnek havuz olarak örnek akışı kullanır ve bir çıkış yolu belirtir:
query = spark.readStream.format("fakestream").load().writeStream.format("fake").start("/output_path")
Sorun giderme
Çıkış aşağıdaki hataysa, işleminiz PySpark özel veri kaynaklarını desteklemez. Databricks Runtime 15.2 veya üzerini kullanmanız gerekir.
Error: [UNSUPPORTED_FEATURE.PYTHON_DATA_SOURCE] The feature is not supported: Python data sources. SQLSTATE: 0A000
Geri Bildirim
https://aka.ms/ContentUserFeedback.
Çok yakında: 2024 boyunca, içerik için geri bildirim mekanizması olarak GitHub Sorunları’nı kullanımdan kaldıracak ve yeni bir geri bildirim sistemiyle değiştireceğiz. Daha fazla bilgi için bkz.Gönderin ve geri bildirimi görüntüleyin