Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Sumber data kustom PySpark dibuat menggunakan API DataSource Python (PySpark), yang memungkinkan pembacaan dari sumber data kustom dan menulis ke sink data kustom di Apache Spark menggunakan Python. Anda dapat menggunakan sumber data kustom PySpark untuk menentukan koneksi kustom ke sistem data dan menerapkan fungsionalitas tambahan untuk membangun sumber data yang dapat digunakan kembali.
Catatan
Sumber data kustom PySpark memerlukan Databricks Runtime 15.4 LTS ke atas, atau lingkungan tanpa server versi 2.
Kelas DataSource
PySpark DataSource adalah kelas dasar yang menyediakan metode untuk membuat pembaca dan penulis data.
Menerapkan subkelas sumber data
Bergantung pada kasus penggunaan Anda, berikut ini harus diimplementasikan oleh subkelas apa pun untuk membuat sumber data dapat dibaca, dapat ditulis, atau keduanya:
| Properti atau Metode | Deskripsi |
|---|---|
name |
Harus diisi. Nama sumber data |
schema |
Harus diisi. Skema sumber data yang akan dibaca atau ditulis |
reader() |
Harus mengembalikan DataSourceReader untuk membuat sumber data dapat dibaca (batch) |
writer() |
Harus mengembalikan DataSourceWriter untuk membuat saluran data dapat ditulisi (batch) |
streamReader() atau simpleStreamReader() |
Harus mengembalikan DataSourceStreamReader agar bisa membaca aliran data (streaming) |
streamWriter() |
Harus mengembalikan DataSourceStreamWriter untuk membuat aliran data dapat ditulis (streaming) |
Catatan
Metode yang ditentukan oleh pengguna DataSource, DataSourceReader, DataSourceWriter, DataSourceStreamReader, DataSourceStreamWriter, dan harus dapat diserialisasikan. Dengan kata lain, kamus tersebut harus berupa kamus atau kamus berlapis yang berisi jenis primitif.
Mendaftarkan sumber data
Setelah menerapkan antarmuka, Anda harus mendaftarkannya, lalu Anda dapat memuat atau menggunakannya seperti yang ditunjukkan dalam contoh berikut:
# Register the data source
spark.dataSource.register(MyDataSourceClass)
# Read from a custom data source
spark.read.format("my_datasource_name").load().show()
Contoh 1: Membuat DataSource PySpark untuk kueri batch
Untuk menunjukkan kemampuan pembaca PySpark DataSource, buat sumber data yang menghasilkan contoh data menggunakan faker paket Python. Untuk informasi selengkapnya tentang faker, silakan merujuk ke dokumentasi Faker.
faker Instal paket menggunakan perintah berikut:
%pip install faker
Langkah 1: Menerapkan pembaca data untuk batch query
Pertama, terapkan logika pembaca untuk menghasilkan contoh data. Gunakan pustaka faker yang diinstal untuk mengisi setiap bidang dalam skema.
class FakeDataSourceReader(DataSourceReader):
def __init__(self, schema, options):
self.schema: StructType = schema
self.options = options
def read(self, partition):
# Library imports must be within the method.
from faker import Faker
fake = Faker()
# Every value in this `self.options` dictionary is a string.
num_rows = int(self.options.get("numRows", 3))
for _ in range(num_rows):
row = []
for field in self.schema.fields:
value = getattr(fake, field.name)()
row.append(value)
yield tuple(row)
Langkah 2: Tentukan contoh DataSource
Selanjutnya, tentukan PySpark DataSource baru Anda sebagai subkelas DataSource dengan nama, skema, dan pembaca. Metode reader() harus didefinisikan untuk membaca dari sumber data dalam kueri batch.
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType
class FakeDataSource(DataSource):
"""
An example data source for batch query using the `faker` library.
"""
@classmethod
def name(cls):
return "fake"
def schema(self):
return "name string, date string, zipcode string, state string"
def reader(self, schema: StructType):
return FakeDataSourceReader(schema, self.options)
Langkah 3: Daftarkan dan gunakan contoh sumber data
Untuk menggunakan sumber data, daftarkan. Secara default, FakeDataSource memiliki tiga baris, dan skema menyertakan bidang string ini: name, date, zipcode, state. Contoh berikut mendaftarkan, memuat, dan menghasilkan sumber data contoh dengan default:
spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
+-----------------+----------+-------+----------+
| name| date|zipcode| state|
+-----------------+----------+-------+----------+
|Christine Sampson|1979-04-24| 79766| Colorado|
| Shelby Cox|2011-08-05| 24596| Florida|
| Amanda Robinson|2019-01-06| 57395|Washington|
+-----------------+----------+-------+----------+
Hanya string bidang yang didukung, tetapi Anda dapat menentukan skema dengan bidang apa pun yang sesuai dengan faker bidang penyedia paket untuk menghasilkan data acak untuk pengujian dan pengembangan. Contoh berikut memuat bidang name dan company dalam sumber data:
spark.read.format("fake").schema("name string, company string").load().show()
+---------------------+--------------+
|name |company |
+---------------------+--------------+
|Tanner Brennan |Adams Group |
|Leslie Maxwell |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc |
+---------------------+--------------+
Untuk memuat sumber data dengan jumlah baris kustom, tentukan numRows opsi . Contoh berikut menentukan 5 baris:
spark.read.format("fake").option("numRows", 5).load().show()
+--------------+----------+-------+------------+
| name| date|zipcode| state|
+--------------+----------+-------+------------+
| Pam Mitchell|1988-10-20| 23788| Tennessee|
|Melissa Turner|1996-06-14| 30851| Nevada|
| Brian Ramsey|2021-08-21| 55277| Washington|
| Caitlin Reed|1983-06-22| 89813|Pennsylvania|
| Douglas James|2007-01-18| 46226| Alabama|
+--------------+----------+-------+------------+
Contoh 2: Membuat PySpark GitHub DataSource menggunakan varian
Untuk menunjukkan penggunaan varian dalam PySpark DataSource, contoh ini membuat sumber data yang membaca permintaan pull dari GitHub.
Catatan
Varian didukung dengan sumber data kustom PySpark di Databricks Runtime 17.1 ke atas.
Untuk informasi tentang varian, lihat Kueri data varian.
Langkah 1: Terapkan pembaca untuk mengambil permintaan pull
Pertama, terapkan logika pembaca untuk mengambil permintaan pull dari repositori GitHub yang ditentukan.
class GithubVariantPullRequestReader(DataSourceReader):
def __init__(self, options):
self.token = options.get("token")
self.repo = options.get("path")
if self.repo is None:
raise Exception(f"Must specify a repo in `.load()` method.")
# Every value in this `self.options` dictionary is a string.
self.num_rows = int(options.get("numRows", 10))
def read(self, partition):
header = {
"Accept": "application/vnd.github+json",
}
if self.token is not None:
header["Authorization"] = f"Bearer {self.token}"
url = f"https://api.github.com/repos/{self.repo}/pulls"
response = requests.get(url, headers=header)
response.raise_for_status()
prs = response.json()
for pr in prs[:self.num_rows]:
yield Row(
id = pr.get("number"),
title = pr.get("title"),
user = VariantVal.parseJson(json.dumps(pr.get("user"))),
created_at = pr.get("created_at"),
updated_at = pr.get("updated_at")
)
Langkah 2: Tentukan Sumber Data GitHub
Selanjutnya, tentukan PySpark GitHub DataSource baru Anda sebagai subkelas DataSource dengan nama, skema, dan metode reader(). Skema ini mencakup bidang-bidang ini: id, , title, usercreated_at, updated_at. Bidang user didefinisikan sebagai varian.
import json
import requests
from pyspark.sql import Row
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import VariantVal
class GithubVariantDataSource(DataSource):
@classmethod
def name(self):
return "githubVariant"
def schema(self):
return "id int, title string, user variant, created_at string, updated_at string"
def reader(self, schema):
return GithubVariantPullRequestReader(self.options)
Langkah 3: Daftarkan dan gunakan sumber data
Untuk menggunakan sumber data, daftarkan. Contoh berikut mendaftar, lalu memuat sumber data dan menghasilkan tiga baris data PR repositori GitHub:
spark.dataSource.register(GithubVariantDataSource)
spark.read.format("githubVariant").option("numRows", 3).load("apache/spark").display()
+---------+-----------------------------------------------------+---------------------+----------------------+----------------------+
| id | title | user | created_at | updated_at |
+---------+---------------------------------------------------- +---------------------+----------------------+----------------------+
| 51293 |[SPARK-52586][SQL] Introduce AnyTimeType | {"avatar_url":...} | 2025-06-26T09:20:59Z | 2025-06-26T15:22:39Z |
| 51292 |[WIP][PYTHON] Arrow UDF for aggregation | {"avatar_url":...} | 2025-06-26T07:52:27Z | 2025-06-26T07:52:37Z |
| 51290 |[SPARK-50686][SQL] Hash to sort aggregation fallback | {"avatar_url":...} | 2025-06-26T06:19:58Z | 2025-06-26T06:20:07Z |
+---------+-----------------------------------------------------+---------------------+----------------------+----------------------+
Contoh 3: Membuat PySpark DataSource untuk streaming baca dan tulis
Untuk menunjukkan kemampuan pembaca aliran dan penulis PySpark DataSource, buat contoh sumber data yang menghasilkan dua baris di setiap microbatch menggunakan faker paket Python. Untuk informasi selengkapnya tentang faker, silakan merujuk ke dokumentasi Faker.
faker Instal paket menggunakan perintah berikut:
%pip install faker
Langkah 1: Menerapkan pembaca aliran
Pertama, terapkan contoh pembaca data streaming yang menghasilkan dua baris di setiap mikrobatch. Anda dapat menerapkan DataSourceStreamReader, atau jika sumber data memiliki throughput rendah dan tidak memerlukan pemartisian, Anda dapat menerapkan SimpleDataSourceStreamReader sebagai gantinya. Baik simpleStreamReader() atau streamReader() harus diimplementasikan, dan simpleStreamReader() hanya dipanggil ketika streamReader() tidak diimplementasikan.
Implementasi DataSourceStreamReader
Instans streamReader memiliki offset integer yang bertambah 2 pada setiap batch mikro, yang diimplementasikan dengan antarmuka DataSourceStreamReader.
from pyspark.sql.datasource import InputPartition
from typing import Iterator, Tuple
import os
import json
class RangePartition(InputPartition):
def __init__(self, start, end):
self.start = start
self.end = end
class FakeStreamReader(DataSourceStreamReader):
def __init__(self, schema, options):
self.current = 0
def initialOffset(self) -> dict:
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}
def latestOffset(self) -> dict:
"""
Returns the current latest offset that the next microbatch will read to.
"""
self.current += 2
return {"offset": self.current}
def partitions(self, start: dict, end: dict):
"""
Plans the partitioning of the current microbatch defined by start and end offset. It
needs to return a sequence of :class:`InputPartition` objects.
"""
return [RangePartition(start["offset"], end["offset"])]
def commit(self, end: dict):
"""
This is invoked when the query has finished processing data before end offset. This
can be used to clean up the resource.
"""
pass
def read(self, partition) -> Iterator[Tuple]:
"""
Takes a partition as an input and reads an iterator of tuples from the data source.
"""
start, end = partition.start, partition.end
for i in range(start, end):
yield (i, str(i))
Implementasi SimpleDataSourceStreamReader
SimpleStreamReader Instance sama dengan FakeStreamReader instance yang menghasilkan dua baris di setiap batch, tetapi diimplementasikan tanpa pemartisian menggunakan antarmuka SimpleDataSourceStreamReader.
class SimpleStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}
def read(self, start: dict) -> (Iterator[Tuple], dict):
"""
Takes start offset as an input, then returns an iterator of tuples and the start offset of the next read.
"""
start_idx = start["offset"]
it = iter([(i,) for i in range(start_idx, start_idx + 2)])
return (it, {"offset": start_idx + 2})
def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
"""
Takes start and end offset as inputs, then reads an iterator of data deterministically.
This is called when the query replays batches during restart or after a failure.
"""
start_idx = start["offset"]
end_idx = end["offset"]
return iter([(i,) for i in range(start_idx, end_idx)])
def commit(self, end):
"""
This is invoked when the query has finished processing data before end offset. This can be used to clean up resources.
"""
pass
Langkah 2: Menerapkan penulis data stream
Selanjutnya terapkan penulis streaming. Penulis data streaming ini menulis informasi metadata setiap microbatch ke jalur lokal.
from pyspark.sql.datasource import DataSourceStreamWriter, WriterCommitMessage
class SimpleCommitMessage(WriterCommitMessage):
def __init__(self, partition_id: int, count: int):
self.partition_id = partition_id
self.count = count
class FakeStreamWriter(DataSourceStreamWriter):
def __init__(self, options):
self.options = options
self.path = self.options.get("path")
assert self.path is not None
def write(self, iterator):
"""
Writes the data and then returns the commit message for that partition. Library imports must be within the method.
"""
from pyspark import TaskContext
context = TaskContext.get()
partition_id = context.partitionId()
cnt = 0
for row in iterator:
cnt += 1
return SimpleCommitMessage(partition_id=partition_id, count=cnt)
def commit(self, messages, batchId) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` when all write tasks have succeeded, then decides what to do with it.
In this FakeStreamWriter, the metadata of the microbatch(number of rows and partitions) is written into a JSON file inside commit().
"""
status = dict(num_partitions=len(messages), rows=sum(m.count for m in messages))
with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
file.write(json.dumps(status) + "\n")
def abort(self, messages, batchId) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` from successful tasks when some other tasks have failed, then decides what to do with it.
In this FakeStreamWriter, a failure message is written into a text file inside abort().
"""
with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
file.write(f"failed in batch {batchId}")
Langkah 3: Tentukan contoh DataSource
Sekarang tentukan PySpark DataSource baru Anda sebagai subkelas DataSource dengan nama, skema, dan metode streamReader() dan streamWriter().
from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter
from pyspark.sql.types import StructType
class FakeStreamDataSource(DataSource):
"""
An example data source for streaming read and write using the `faker` library.
"""
@classmethod
def name(cls):
return "fakestream"
def schema(self):
return "name string, state string"
def streamReader(self, schema: StructType):
return FakeStreamReader(schema, self.options)
# If you don't need partitioning, you can implement the simpleStreamReader method instead of streamReader.
# def simpleStreamReader(self, schema: StructType):
# return SimpleStreamReader()
def streamWriter(self, schema: StructType, overwrite: bool):
return FakeStreamWriter(self.options)
Langkah 4: Daftarkan dan gunakan contoh sumber data
Untuk menggunakan sumber data, daftarkan. Setelah terdaftar, Anda dapat menggunakannya dalam kueri streaming sebagai sumber atau sink dengan meneruskan nama pendek atau nama lengkap ke format(). Contoh berikut mendaftarkan sumber data, kemudian memulai kueri yang membaca dari contoh sumber data dan menampilkan output ke konsol.
spark.dataSource.register(FakeStreamDataSource)
query = spark.readStream.format("fakestream").load().writeStream.format("console").start()
Atau, kode berikut menggunakan contoh aliran sebagai sink dan menentukan jalur output:
spark.dataSource.register(FakeStreamDataSource)
# Make sure the output directory exists and is writable
output_path = "/output_path"
dbutils.fs.mkdirs(output_path)
checkpoint_path = "/output_path/checkpoint"
query = (
spark.readStream
.format("fakestream")
.load()
.writeStream
.format("fakestream")
.option("path", output_path)
.option("checkpointLocation", checkpoint_path)
.start()
)
Contoh 4: Membuat konektor streaming Google BigQuery
Contoh berikut menunjukkan cara membuat konektor streaming kustom untuk Google BigQuery (BQ) menggunakan PySpark DataSource. Databricks menyediakan konektor Spark untuk penyerapan batch BigQuery, dan Federasi Lakehouse juga dapat terhubung dari jarak jauh ke himpunan data BigQuery dan menarik data melalui pembuatan katalog asing, tetapi tidak sepenuhnya mendukung alur kerja streaming bertahap atau berkelanjutan. Konektor ini memungkinkan migrasi data inkremental berdasarkan fase, dan migrasi nyaris waktu nyata dari tabel BigQuery yang diterima dari sumber streaming dengan pengecekan titik kontrol berkelanjutan.
Konektor kustom ini memiliki fitur berikut:
- Kompatibel dengan Streaming Terstruktur dan Alur Deklaratif Lakeflow Spark.
- Mendukung pelacakan rekaman bertahap dan penyerapan streaming berkelanjutan, dan mengikuti semantik Streaming Terstruktur.
- Menggunakan BIGQuery Storage API dengan protokol berbasis RPC untuk transmisi data yang lebih cepat dan lebih murah.
- Menulis tabel yang dimigrasikan langsung ke Unity Catalog.
- Mengelola titik pemeriksaan secara otomatis menggunakan bidang inkremental berbasis tanggal atau tanda waktu.
- Mendukung penyerapan batch dengan
Trigger.AvailableNow(). - Tidak memerlukan penyimpanan cloud perantara.
- Menserialisasi transmisi data BigQuery menggunakan format Arrow atau Avro.
- Mengelola paralelisme otomatis dan mendistribusikan tugas di antara pekerja Spark berdasarkan volume data.
- Cocok untuk migrasi lapisan Mentah dan Perunggu dari BigQuery, dengan dukungan untuk migrasi lapisan Silver dan Gold menggunakan pola SCD Tipe 1 atau Tipe 2.
Prasyarat
Sebelum menerapkan konektor kustom, instal paket yang diperlukan:
%pip install faker google.cloud google.cloud.bigquery google.cloud.bigquery_storage
Langkah 1: Menerapkan pembaca aliran
Pertama, terapkan pembaca data streaming. Subkelas DataSourceStreamReader harus menerapkan metode berikut:
initialOffset(self) -> dictlatestOffset(self) -> dictpartitions(self, start: dict, end: dict) -> Sequence[InputPartition]read(self, partition: InputPartition) -> Union[Iterator[Tuple], Iterator[Row]]commit(self, end: dict) -> Nonestop(self) -> None
Untuk detail tentang setiap metode, lihat Metode.
import os
from pyspark.sql.datasource import DataSourceStreamReader, InputPartition
from pyspark.sql.datasource import DataSourceStreamWriter
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql.datasource import DataSource
from pathlib import Path
from pyarrow.lib import TimestampScalar
from datetime import datetime
from typing import Iterator, Tuple, Any, Dict, List, Sequence
from google.cloud.bigquery_storage import BigQueryReadClient, ReadSession
from google.cloud import bigquery_storage
import pandas
import datetime
import uuid
import time, logging
start_time = time.time()
class RangePartition(InputPartition):
def __init__(self, session: ReadSession, stream_idx: int):
self.session = session
self.stream_idx = stream_idx
class BQStreamReader(DataSourceStreamReader):
def __init__(self, schema, options):
self.project_id = options.get("project_id")
self.dataset = options.get("dataset")
self.table = options.get("table")
self.json_auth_file = "/home/"+options.get("service_auth_json_file_name")
self.max_parallel_conn = options.get("max_parallel_conn", 1000)
self.incremental_checkpoint_field = options.get("incremental_checkpoint_field", "")
self.last_offset = None
def initialOffset(self) -> dict:
"""
Returns the initial start offset of the reader.
"""
from datetime import datetime
logging.info("Inside initialOffset!!!!!")
# self.increment_latest_vals.append(datetime.strptime('1900-01-01 23:57:12', "%Y-%m-%d %H:%M:%S"))
self.last_offset = '1900-01-01 23:57:12'
return {"offset": str(self.last_offset)}
def latestOffset(self):
"""
Returns the current latest offset that the next microbatch will read to.
"""
from datetime import datetime
from google.cloud import bigquery
if (self.last_offset is None):
self.last_offset = '1900-01-01 23:57:12'
client = bigquery.Client.from_service_account_json(self.json_auth_file)
# max_offset=start["offset"]
logging.info(f"************************last_offset: {self.last_offset}***********************")
f_sql_str = ''
for x_str in self.incremental_checkpoint_field.strip().split(","):
f_sql_str += f"{x_str}>'{self.last_offset}' or "
f_sql_str = f_sql_str[:-3]
job_query = client.query(
f"select max({self.incremental_checkpoint_field}) from {self.project_id}.{self.dataset}.{self.table} where {f_sql_str}")
for query in job_query.result():
max_res = query[0]
if (str(max_res).lower() != 'none'):
return {"offset": str(max_res)}
return {"offset": str(self.last_offset)}
def partitions(self, start: dict, end: dict) -> Sequence[InputPartition]:
"""
Plans the partitioning of the current microbatch defined by start and end offset. It
needs to return a sequence of :class:`InputPartition` objects.
"""
if (self.last_offset is None):
self.last_offset = end['offset']
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = self.json_auth_file
# project_id = self.auth_project_id
client = BigQueryReadClient()
# This example reads baby name data from the public datasets.
table = "projects/{}/datasets/{}/tables/{}".format(
self.project_id, self.dataset, self.table
)
requested_session = bigquery_storage.ReadSession()
requested_session.table = table
if (self.incremental_checkpoint_field != ''):
start_offset = start["offset"]
end_offset = end["offset"]
f_sql_str = ''
for x_str in self.incremental_checkpoint_field.strip().split(","):
f_sql_str += f"({x_str}>'{start_offset}' and {x_str}<='{end_offset}') or "
f_sql_str = f_sql_str[:-3]
requested_session.read_options.row_restriction = f"{f_sql_str}"
# This example leverages Apache Avro.
requested_session.data_format = bigquery_storage.DataFormat.AVRO
parent = "projects/{}".format(self.project_id)
session = client.create_read_session(
request={
"parent": parent,
"read_session": requested_session,
"max_stream_count": int(self.max_parallel_conn),
},
)
self.last_offset = end['offset']
return [RangePartition(session, i) for i in range(len(session.streams))]
def read(self, partition) -> Iterator[List]:
"""
Takes a partition as an input and reads an iterator of tuples from the data source.
"""
from datetime import datetime
session = partition.session
stream_idx = partition.stream_idx
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = self.json_auth_file
client_1 = BigQueryReadClient()
# requested_session.read_options.selected_fields = ["census_tract", "clearance_date", "clearance_status"]
reader = client_1.read_rows(session.streams[stream_idx].name)
reader_iter = []
for message in reader.rows():
reader_iter_in = []
for k, v in message.items():
reader_iter_in.append(v)
# yield(reader_iter)
reader_iter.append(reader_iter_in)
# yield (message['hash'], message['size'], message['virtual_size'], message['version'])
# self.increment_latest_vals.append(max_incr_val)
return iter(reader_iter)
def commit(self, end):
"""
This is invoked when the query has finished processing data before end offset. This
can be used to clean up the resource.
"""
pass
Langkah 2: Tentukan Sumber Data
Selanjutnya, tentukan sumber data kustom. Subkelas DataSource harus menerapkan metode berikut:
name(cls) -> strschema(self) -> Union[StructType, str]
Untuk detail tentang setiap metode, lihat Metode.
from pyspark.sql.datasource import DataSource
from pyspark.sql.types import StructType
from google.cloud import bigquery
class BQStreamDataSource(DataSource):
"""
An example data source for streaming data from a public API containing users' comments.
"""
@classmethod
def name(cls):
return "bigquery-streaming"
def schema(self):
type_map = {'integer': 'long', 'float': 'double', 'record': 'string'}
json_auth_file = "/home/" + self.options.get("service_auth_json_file_name")
client = bigquery.Client.from_service_account_json(json_auth_file)
table_ref = self.options.get("project_id") + '.' + self.options.get("dataset") + '.' + self.options.get("table")
table = client.get_table(table_ref)
original_schema = table.schema
result = []
for schema in original_schema:
col_attr_name = schema.name
if (schema.mode != 'REPEATED'):
col_attr_type = type_map.get(schema.field_type.lower(), schema.field_type.lower())
else:
col_attr_type = f"array<{type_map.get(schema.field_type.lower(), schema.field_type.lower())}>"
result.append(col_attr_name + " " + col_attr_type)
return ",".join(result)
# return "census_tract double,clearance_date string,clearance_status string"
def streamReader(self, schema: StructType):
return BQStreamReader(schema, self.options)
Langkah 3: Mengonfigurasi dan memulai kueri streaming
Terakhir, daftarkan konektor, lalu konfigurasikan dan mulai kueri streaming:
spark.dataSource.register(BQStreamDataSource)
# Ingests table data incrementally using the provided timestamp-based field.
# The latest value is checkpointed using offset semantics.
# Without the incremental input field, full table ingestion is performed.
# Service account JSON files must be available to every Spark executor worker
# in the /home folder using --files /home/<file_name>.json or an init script.
query = (
spark.readStream.format("bigquery-streaming")
.option("project_id", <bq_project_id>)
.option("incremental_checkpoint_field", <table_incremental_ts_based_col>)
.option("dataset", <bq_dataset_name>)
.option("table", <bq_table_name>)
.option("service_auth_json_file_name", <service_account_json_file_name>)
.option("max_parallel_conn", <max_parallel_threads_to_pull_data>) # defaults to max 1000
.load()
)
(
query.writeStream.trigger(processingTime="30 seconds")
.option("checkpointLocation", "checkpoint_path")
.foreachBatch(writeToTable) # your target table write function
.start()
)
Urutan eksekusi
Urutan eksekusi fungsi dari aliran kustom dijelaskan sebagai berikut.
Untuk memuat aliran DataFrame Spark:
name(cls)
schema()
Untuk microbatch (n) ketika kueri baru dimulai, atau ketika memulai ulang kueri yang sudah ada (cek poin baru atau yang sudah ada):
partitions(end_offset, end_offset) # loads the last saved offset from the checkpoint at query restart
latestOffset()
partitions(start_offset, end_offset) # plans partitions and distributes to Python workers
read() # user’s source read definition, runs on each Python worker
commit()
Untuk mikro-batch (n+1) berikutnya dari kueri yang aktif pada checkpoint yang sudah ada:
latestOffset()
partitions(start_offset, end_offset)
read()
commit()
Catatan
Fungsi ini latestOffset mengatur titik pemeriksaan. Bagikan variabel titik pemeriksaan dari jenis primitif di seluruh fungsi dan kembalikan sebagai kamus. Misalnya: return {"offset": str(self.last_offset)}
Pemecahan Masalah
Jika output adalah kesalahan berikut, komputasi Anda tidak mendukung sumber data kustom PySpark. Anda harus menggunakan Databricks Runtime 15.2 atau lebih tinggi.
Error: [UNSUPPORTED_FEATURE.PYTHON_DATA_SOURCE] The feature is not supported: Python data sources. SQLSTATE: 0A000