一個資料來源的基底類別。
此類別代表一個自訂資料來源,允許從其中讀取和/或寫入。 資料來源提供方法,分別建立讀取器與寫入者以讀取與寫入資料。 至少有一個方法 reader() 或 writer() 必須由任何子類別實作,才能使資料來源可讀或可寫入(或兩者兼具)。
實作此介面後,你可以用 spark.read.format(...).load() 載入資料來源,並用 df.write.format(...).save()儲存資料。
語法
from pyspark.sql.datasource import DataSource
class MyDataSource(DataSource):
@classmethod
def name(cls):
return "my_data_source"
參數
| 參數 | 類型 | 說明 |
|---|---|---|
options |
dict | 一個不區分大小寫的字典,代表此資料來源的選項。 |
方法
| 方法 | 說明 |
|---|---|
name() |
回傳一個字串,代表該資料來源的格式名稱。 預設情況下,會回傳類別名稱。 覆蓋以提供客製化的簡短名稱。 |
schema() |
回傳資料來源的結構,以 StructType DDL 字串形式。 若未實作且使用者未提供結構,則會拋出例外。 |
reader(schema) |
回傳 DataSourceReader 一個用於讀取資料的實例。 為可讀資料來源所必需。 |
writer(schema, overwrite) |
回傳 DataSourceWriter 一個用於寫入資料的實例。 這是可寫入資料來源的必要條件。 |
streamWriter(schema, overwrite) |
回傳 DataSourceStreamWriter 一個實例,用於將資料寫入串流匯入。 可寫入串流資料來源的必要條件。 |
simpleStreamReader(schema) |
回傳 SimpleDataSourceStreamReader 一個讀取串流資料的實例。 僅在未實作時 streamReader() 使用。 |
streamReader(schema) |
回傳 DataSourceStreamReader 一個讀取串流資料的實例。 優先於 simpleStreamReader()。 |
Examples
定義並註冊一個自訂可讀的資料來源:
from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition
class MyDataSource(DataSource):
@classmethod
def name(cls):
return "my_data_source"
def schema(self):
return "a INT, b STRING"
def reader(self, schema):
return MyDataSourceReader(schema)
class MyDataSourceReader(DataSourceReader):
def read(self, partition):
yield (1, "hello")
yield (2, "world")
spark.dataSource.register(MyDataSource)
df = spark.read.format("my_data_source").load()
df.show()
定義一個帶有 StructType 結構的資料來源:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
class MyDataSource(DataSource):
def schema(self):
return StructType().add("a", "int").add("b", "string")