PySpark 사용자 지정 데이터 원본은 Python(PySpark) DataSource API를 사용하여 만들어지며, 이를 통해 사용자 지정 데이터 원본에서 읽고 Python을 사용하여 Apache Spark의 사용자 지정 데이터 싱크에 쓸 수 있습니다. PySpark 사용자 지정 데이터 원본을 사용하여 데이터 시스템에 대한 사용자 지정 연결을 정의하고 재사용 가능한 데이터 원본을 빌드하는 추가 기능을 구현할 수 있습니다.
참고
PySpark 사용자 지정 데이터 원본에는 Databricks Runtime 15.4 LTS 이상 또는 서버리스 환경 버전 2가 필요합니다.
DataSource 클래스
PySpark DataSource 는 데이터 판독기 및 작성기를 만드는 메서드를 제공하는 기본 클래스입니다.
데이터 원본 하위 클래스 구현
사용 사례에 따라 데이터 원본을 읽을 수 있거나 쓰기 가능하거나 둘 다 만들려면 하위 클래스에서 다음을 구현해야 합니다.
| 속성 또는 메서드 | 설명 |
|---|---|
name |
필수입니다. 데이터 원본의 이름 |
schema |
필수입니다. 읽거나 쓸 데이터 원본의 스키마 |
reader() |
데이터 원본을 읽을 수 있도록 하려면 DataSourceReader(일괄 처리)를 반환해야 합니다. |
writer() |
데이터 싱크를 쓰기 가능하게(일괄 처리) 하려면 DataSourceWriter 를 반환해야 합니다. |
streamReader() 또는 simpleStreamReader() |
DataSourceStreamReader을 반환하여 데이터 스트림을 읽을 수 있게 해야 합니다(스트리밍). |
streamWriter() |
데이터 스트림을 쓰기 가능(스트리밍)하려면 DataSourceStreamWriter를 반환해야 합니다. |
참고
사용자 정의 DataSource, , DataSourceReader, DataSourceWriterDataSourceStreamReaderDataSourceStreamWriter및 해당 메서드를 직렬화할 수 있어야 합니다. 즉, 기본 형식을 포함하는 사전 또는 중첩된 사전이어야 합니다.
데이터 원본 등록
인터페이스를 구현한 후 등록해야 하며, 다음 예제와 같이 로드하거나 사용할 수 있습니다.
# Register the data source
spark.dataSource.register(MyDataSourceClass)
# Read from a custom data source
spark.read.format("my_datasource_name").load().show()
예제 1: 일괄 처리 쿼리를 위한 PySpark DataSource 만들기
PySpark DataSource 판독기 기능을 보여 주려면 Python 패키지를 사용하여 예제 데이터를 생성하는 데이터 원본을 faker 만듭니다. 자세한 faker내용은 Faker 설명서를 참조 하세요.
다음 명령을 사용하여 faker 패키지를 설치합니다.
%pip install faker
1단계: 일괄 처리 쿼리에 대한 판독기 구현
먼저 예제 데이터를 생성하는 판독기 논리를 구현합니다. 설치된 faker 라이브러리를 사용하여 스키마의 각 필드를 채웁다.
class FakeDataSourceReader(DataSourceReader):
def __init__(self, schema, options):
self.schema: StructType = schema
self.options = options
def read(self, partition):
# Library imports must be within the method.
from faker import Faker
fake = Faker()
# Every value in this `self.options` dictionary is a string.
num_rows = int(self.options.get("numRows", 3))
for _ in range(num_rows):
row = []
for field in self.schema.fields:
value = getattr(fake, field.name)()
row.append(value)
yield tuple(row)
2단계: 예제 DataSource 정의
다음으로, 새 PySpark DataSource를 이름, 스키마 및 판독기를 사용하는 하위 클래스 DataSource 로 정의합니다.
reader() 일괄 처리 쿼리의 데이터 원본에서 읽도록 메서드를 정의해야 합니다.
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType
class FakeDataSource(DataSource):
"""
An example data source for batch query using the `faker` library.
"""
@classmethod
def name(cls):
return "fake"
def schema(self):
return "name string, date string, zipcode string, state string"
def reader(self, schema: StructType):
return FakeDataSourceReader(schema, self.options)
3단계: 예제 데이터 원본 등록 및 사용
데이터 원본을 사용하려면 데이터 원본을 등록합니다. 기본적으로 FakeDataSource 세 개의 행이 있으며 스키마에는 string, name, date, zipcodestate 필드가 포함됩니다. 다음 예제에서는 기본값을 사용하여 예제 데이터 원본을 등록, 로드 및 출력합니다.
spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
+-----------------+----------+-------+----------+
| name| date|zipcode| state|
+-----------------+----------+-------+----------+
|Christine Sampson|1979-04-24| 79766| Colorado|
| Shelby Cox|2011-08-05| 24596| Florida|
| Amanda Robinson|2019-01-06| 57395|Washington|
+-----------------+----------+-------+----------+
string 필드만 지원되지만, faker 패키지 공급자의 필드에 해당하는 임의의 필드가 포함된 스키마를 지정하여 테스트 및 개발을 위한 임의 데이터를 생성할 수 있습니다. 다음 예제에서는 name 및 company 필드를 포함하여 데이터 원본을 로드합니다.
spark.read.format("fake").schema("name string, company string").load().show()
+---------------------+--------------+
|name |company |
+---------------------+--------------+
|Tanner Brennan |Adams Group |
|Leslie Maxwell |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc |
+---------------------+--------------+
사용자 지정 행 수를 사용하여 데이터 원본을 로드하려면 옵션을 지정 numRows 합니다. 다음 예제에서는 5개의 행을 지정합니다.
spark.read.format("fake").option("numRows", 5).load().show()
+--------------+----------+-------+------------+
| name| date|zipcode| state|
+--------------+----------+-------+------------+
| Pam Mitchell|1988-10-20| 23788| Tennessee|
|Melissa Turner|1996-06-14| 30851| Nevada|
| Brian Ramsey|2021-08-21| 55277| Washington|
| Caitlin Reed|1983-06-22| 89813|Pennsylvania|
| Douglas James|2007-01-18| 46226| Alabama|
+--------------+----------+-------+------------+
예제 2: 변형을 사용하여 PySpark GitHub DataSource 만들기
PySpark DataSource에서 변형을 사용하는 방법을 보여 주는 이 예제에서는 GitHub에서 끌어오기 요청을 읽는 데이터 원본을 만듭니다.
참고
Variant는 Databricks Runtime 17.1 이상에서 PySpark 사용자 지정 데이터 원본에서 지원됩니다.
변형에 대한 자세한 내용은 쿼리 변형 데이터를 참조하세요.
1단계: 끌어오기 요청을 검색하는 판독기 구현
먼저 지정된 GitHub 리포지토리에서 끌어오기 요청을 검색하는 판독기 논리를 구현합니다.
class GithubVariantPullRequestReader(DataSourceReader):
def __init__(self, options):
self.token = options.get("token")
self.repo = options.get("path")
if self.repo is None:
raise Exception(f"Must specify a repo in `.load()` method.")
# Every value in this `self.options` dictionary is a string.
self.num_rows = int(options.get("numRows", 10))
def read(self, partition):
header = {
"Accept": "application/vnd.github+json",
}
if self.token is not None:
header["Authorization"] = f"Bearer {self.token}"
url = f"https://api.github.com/repos/{self.repo}/pulls"
response = requests.get(url, headers=header)
response.raise_for_status()
prs = response.json()
for pr in prs[:self.num_rows]:
yield Row(
id = pr.get("number"),
title = pr.get("title"),
user = VariantVal.parseJson(json.dumps(pr.get("user"))),
created_at = pr.get("created_at"),
updated_at = pr.get("updated_at")
)
2단계: GitHub DataSource 정의
다음으로, 새 PySpark GitHub DataSource를 DataSource의 하위 클래스로 정의하고, 이름, 스키마, 메서드 reader()를 설정합니다. 스키마에는 다음 필드가 idtitleusercreated_at포함됩니다. updated_at
user 필드는 변형으로 정의됩니다.
import json
import requests
from pyspark.sql import Row
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import VariantVal
class GithubVariantDataSource(DataSource):
@classmethod
def name(self):
return "githubVariant"
def schema(self):
return "id int, title string, user variant, created_at string, updated_at string"
def reader(self, schema):
return GithubVariantPullRequestReader(self.options)
3단계: 데이터 원본 등록 및 사용
데이터 원본을 사용하려면 데이터 원본을 등록합니다. 다음 예제에서는 데이터 원본을 등록한 다음 GitHub 리포지토리 PR 데이터의 세 행을 출력합니다.
spark.dataSource.register(GithubVariantDataSource)
spark.read.format("githubVariant").option("numRows", 3).load("apache/spark").display()
+---------+-----------------------------------------------------+---------------------+----------------------+----------------------+
| id | title | user | created_at | updated_at |
+---------+---------------------------------------------------- +---------------------+----------------------+----------------------+
| 51293 |[SPARK-52586][SQL] Introduce AnyTimeType | {"avatar_url":...} | 2025-06-26T09:20:59Z | 2025-06-26T15:22:39Z |
| 51292 |[WIP][PYTHON] Arrow UDF for aggregation | {"avatar_url":...} | 2025-06-26T07:52:27Z | 2025-06-26T07:52:37Z |
| 51290 |[SPARK-50686][SQL] Hash to sort aggregation fallback | {"avatar_url":...} | 2025-06-26T06:19:58Z | 2025-06-26T06:20:07Z |
+---------+-----------------------------------------------------+---------------------+----------------------+----------------------+
예제 3: 읽기 및 쓰기 스트리밍을 위한 PySpark DataSource 만들기
PySpark DataSource 스트림 판독기 및 기록기 기능을 보여 주려면 Python 패키지를 사용하여 모든 마이크로배치에서 두 개의 행을 생성하는 예제 데이터 원본을 faker 만듭니다. 자세한 faker내용은 Faker 설명서를 참조 하세요.
다음 명령을 사용하여 faker 패키지를 설치합니다.
%pip install faker
1단계: 스트림 판독기 구현
먼저 모든 마이크로배치에서 두 개의 행을 생성하는 스트리밍 데이터 판독기 예제를 구현합니다. 구현 DataSourceStreamReader하거나 데이터 원본의 처리량이 낮고 분할이 필요하지 않은 경우 대신 구현 SimpleDataSourceStreamReader 할 수 있습니다.
simpleStreamReader() 또는 streamReader() 중 하나는 구현되어야 하며, simpleStreamReader()이 구현되지 않은 경우에만 streamReader()가 호출됩니다.
DataSourceStreamReader 구현
streamReader 인스턴스에는 DataSourceStreamReader 인터페이스로 구현된 정수 오프셋이 있으며, 이는 모든 마이크로배치에서 2씩 증가합니다.
from pyspark.sql.datasource import InputPartition
from typing import Iterator, Tuple
import os
import json
class RangePartition(InputPartition):
def __init__(self, start, end):
self.start = start
self.end = end
class FakeStreamReader(DataSourceStreamReader):
def __init__(self, schema, options):
self.current = 0
def initialOffset(self) -> dict:
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}
def latestOffset(self) -> dict:
"""
Returns the current latest offset that the next microbatch will read to.
"""
self.current += 2
return {"offset": self.current}
def partitions(self, start: dict, end: dict):
"""
Plans the partitioning of the current microbatch defined by start and end offset. It
needs to return a sequence of :class:`InputPartition` objects.
"""
return [RangePartition(start["offset"], end["offset"])]
def commit(self, end: dict):
"""
This is invoked when the query has finished processing data before end offset. This
can be used to clean up the resource.
"""
pass
def read(self, partition) -> Iterator[Tuple]:
"""
Takes a partition as an input and reads an iterator of tuples from the data source.
"""
start, end = partition.start, partition.end
for i in range(start, end):
yield (i, str(i))
SimpleDataSourceStreamReader 구현
SimpleStreamReader 인스턴스는 분할 없이 FakeStreamReader 인터페이스로 구현된 것으로, 모든 일괄 처리에서 두 개 행을 생성하는 SimpleDataSourceStreamReader 인스턴스와 동일합니다.
class SimpleStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}
def read(self, start: dict) -> (Iterator[Tuple], dict):
"""
Takes start offset as an input, then returns an iterator of tuples and the start offset of the next read.
"""
start_idx = start["offset"]
it = iter([(i,) for i in range(start_idx, start_idx + 2)])
return (it, {"offset": start_idx + 2})
def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
"""
Takes start and end offset as inputs, then reads an iterator of data deterministically.
This is called when the query replays batches during restart or after a failure.
"""
start_idx = start["offset"]
end_idx = end["offset"]
return iter([(i,) for i in range(start_idx, end_idx)])
def commit(self, end):
"""
This is invoked when the query has finished processing data before end offset. This can be used to clean up resources.
"""
pass
2단계: 스트림 작성기 구현
다음으로 스트리밍 기록기를 구현합니다. 이 스트리밍 데이터 작성기는 각 마이크로배치의 메타데이터 정보를 로컬 경로에 씁니다.
from pyspark.sql.datasource import DataSourceStreamWriter, WriterCommitMessage
class SimpleCommitMessage(WriterCommitMessage):
def __init__(self, partition_id: int, count: int):
self.partition_id = partition_id
self.count = count
class FakeStreamWriter(DataSourceStreamWriter):
def __init__(self, options):
self.options = options
self.path = self.options.get("path")
assert self.path is not None
def write(self, iterator):
"""
Writes the data and then returns the commit message for that partition. Library imports must be within the method.
"""
from pyspark import TaskContext
context = TaskContext.get()
partition_id = context.partitionId()
cnt = 0
for row in iterator:
cnt += 1
return SimpleCommitMessage(partition_id=partition_id, count=cnt)
def commit(self, messages, batchId) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` when all write tasks have succeeded, then decides what to do with it.
In this FakeStreamWriter, the metadata of the microbatch(number of rows and partitions) is written into a JSON file inside commit().
"""
status = dict(num_partitions=len(messages), rows=sum(m.count for m in messages))
with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
file.write(json.dumps(status) + "\n")
def abort(self, messages, batchId) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` from successful tasks when some other tasks have failed, then decides what to do with it.
In this FakeStreamWriter, a failure message is written into a text file inside abort().
"""
with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
file.write(f"failed in batch {batchId}")
3단계: 예제 DataSource 정의
이제 새 PySpark DataSource를 이름, 스키마 및 메서드 DataSource 와 함께 서브클래스 streamReader() 로 정의합니다streamWriter().
from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter
from pyspark.sql.types import StructType
class FakeStreamDataSource(DataSource):
"""
An example data source for streaming read and write using the `faker` library.
"""
@classmethod
def name(cls):
return "fakestream"
def schema(self):
return "name string, state string"
def streamReader(self, schema: StructType):
return FakeStreamReader(schema, self.options)
# If you don't need partitioning, you can implement the simpleStreamReader method instead of streamReader.
# def simpleStreamReader(self, schema: StructType):
# return SimpleStreamReader()
def streamWriter(self, schema: StructType, overwrite: bool):
return FakeStreamWriter(self.options)
4단계: 예제 데이터 원본 등록 및 사용
데이터 원본을 사용하려면 데이터 원본을 등록합니다. 등록한 후에는 짧은 이름 또는 전체 이름을 format()전달하여 스트리밍 쿼리에서 원본 또는 싱크로 사용할 수 있습니다. 다음 예제에서는 데이터 원본을 등록한 다음 예제 데이터 원본에서 읽고 콘솔로 출력하는 쿼리를 시작합니다.
spark.dataSource.register(FakeStreamDataSource)
query = spark.readStream.format("fakestream").load().writeStream.format("console").start()
또는 다음 코드는 예제 스트림을 싱크로 사용하고 출력 경로를 지정합니다.
spark.dataSource.register(FakeStreamDataSource)
# Make sure the output directory exists and is writable
output_path = "/output_path"
dbutils.fs.mkdirs(output_path)
checkpoint_path = "/output_path/checkpoint"
query = (
spark.readStream
.format("fakestream")
.load()
.writeStream
.format("fakestream")
.option("path", output_path)
.option("checkpointLocation", checkpoint_path)
.start()
)
예제 4: Google BigQuery 스트리밍 커넥터 만들기
다음 예제에서는 PySpark DataSource를 사용하여 BQ(Google BigQuery)용 사용자 지정 스트리밍 커넥터를 빌드하는 방법을 보여 줍니다. Databricks는 BigQuery 일괄 처리 수집을 위한 Spark 커넥터 를 제공하며, Lakehouse 페더레이션 은 모든 BigQuery 데이터 집합에 원격으로 연결하고 외국 카탈로그 생성을 통해 데이터를 끌어올 수도 있지만 증분 또는 연속 스트리밍 워크플로를 완전히 지원하지는 않습니다. 이 커넥터를 사용하면 단계별 증분 데이터 마이그레이션 및 영구 검사점이 있는 스트리밍 원본에서 공급되는 BigQuery 테이블에서 거의 실시간으로 마이그레이션할 수 있습니다.
이 사용자 지정 커넥터에는 다음과 같은 기능이 있습니다.
- 구조적 스트리밍 및 Lakeflow Spark 선언적 파이프라인과 호환됩니다.
- 증분 레코드 추적 및 연속 스트리밍 수집을 지원하고 구조적 스트리밍 의미 체계를 따릅니다.
- 더 빠르고 저렴한 데이터 전송을 위해 RPC 기반 프로토콜과 함께 BigQuery Storage API를 사용합니다.
- 마이그레이션된 테이블을 Unity 카탈로그에 직접 씁니다.
- 날짜 또는 타임스탬프 기반 증분 필드를 사용하여 검사점을 자동으로 관리합니다.
-
Trigger.AvailableNow()을 지원하여 일괄 수집을 처리합니다. - 중간 클라우드 스토리지가 필요하지 않습니다.
- 화살표 또는 Avro 형식을 사용하여 BigQuery 데이터 전송을 직렬화합니다.
- 자동 병렬 처리를 처리하고 데이터 볼륨에 따라 Spark 작업자 간에 작업을 분산합니다.
- SCD Type 1 또는 Type 2 패턴을 사용하여 Silver 및 Gold 계층 마이그레이션을 지원하는 BigQuery에서 원시 및 브론즈 계층 마이그레이션에 적합합니다.
사전 요구 사항
사용자 지정 커넥터를 구현하기 전에 필요한 패키지를 설치합니다.
%pip install faker google.cloud google.cloud.bigquery google.cloud.bigquery_storage
1단계: 스트림 판독기 구현
먼저 스트리밍 데이터 판독기를 구현합니다. 하위 클래스는 DataSourceStreamReader 다음 메서드를 구현해야 합니다.
initialOffset(self) -> dictlatestOffset(self) -> dictpartitions(self, start: dict, end: dict) -> Sequence[InputPartition]read(self, partition: InputPartition) -> Union[Iterator[Tuple], Iterator[Row]]commit(self, end: dict) -> Nonestop(self) -> None
각 메서드에 대한 자세한 내용은 메서드를 참조 하세요.
import os
from pyspark.sql.datasource import DataSourceStreamReader, InputPartition
from pyspark.sql.datasource import DataSourceStreamWriter
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql.datasource import DataSource
from pathlib import Path
from pyarrow.lib import TimestampScalar
from datetime import datetime
from typing import Iterator, Tuple, Any, Dict, List, Sequence
from google.cloud.bigquery_storage import BigQueryReadClient, ReadSession
from google.cloud import bigquery_storage
import pandas
import datetime
import uuid
import time, logging
start_time = time.time()
class RangePartition(InputPartition):
def __init__(self, session: ReadSession, stream_idx: int):
self.session = session
self.stream_idx = stream_idx
class BQStreamReader(DataSourceStreamReader):
def __init__(self, schema, options):
self.project_id = options.get("project_id")
self.dataset = options.get("dataset")
self.table = options.get("table")
self.json_auth_file = "/home/"+options.get("service_auth_json_file_name")
self.max_parallel_conn = options.get("max_parallel_conn", 1000)
self.incremental_checkpoint_field = options.get("incremental_checkpoint_field", "")
self.last_offset = None
def initialOffset(self) -> dict:
"""
Returns the initial start offset of the reader.
"""
from datetime import datetime
logging.info("Inside initialOffset!!!!!")
# self.increment_latest_vals.append(datetime.strptime('1900-01-01 23:57:12', "%Y-%m-%d %H:%M:%S"))
self.last_offset = '1900-01-01 23:57:12'
return {"offset": str(self.last_offset)}
def latestOffset(self):
"""
Returns the current latest offset that the next microbatch will read to.
"""
from datetime import datetime
from google.cloud import bigquery
if (self.last_offset is None):
self.last_offset = '1900-01-01 23:57:12'
client = bigquery.Client.from_service_account_json(self.json_auth_file)
# max_offset=start["offset"]
logging.info(f"************************last_offset: {self.last_offset}***********************")
f_sql_str = ''
for x_str in self.incremental_checkpoint_field.strip().split(","):
f_sql_str += f"{x_str}>'{self.last_offset}' or "
f_sql_str = f_sql_str[:-3]
job_query = client.query(
f"select max({self.incremental_checkpoint_field}) from {self.project_id}.{self.dataset}.{self.table} where {f_sql_str}")
for query in job_query.result():
max_res = query[0]
if (str(max_res).lower() != 'none'):
return {"offset": str(max_res)}
return {"offset": str(self.last_offset)}
def partitions(self, start: dict, end: dict) -> Sequence[InputPartition]:
"""
Plans the partitioning of the current microbatch defined by start and end offset. It
needs to return a sequence of :class:`InputPartition` objects.
"""
if (self.last_offset is None):
self.last_offset = end['offset']
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = self.json_auth_file
# project_id = self.auth_project_id
client = BigQueryReadClient()
# This example reads baby name data from the public datasets.
table = "projects/{}/datasets/{}/tables/{}".format(
self.project_id, self.dataset, self.table
)
requested_session = bigquery_storage.ReadSession()
requested_session.table = table
if (self.incremental_checkpoint_field != ''):
start_offset = start["offset"]
end_offset = end["offset"]
f_sql_str = ''
for x_str in self.incremental_checkpoint_field.strip().split(","):
f_sql_str += f"({x_str}>'{start_offset}' and {x_str}<='{end_offset}') or "
f_sql_str = f_sql_str[:-3]
requested_session.read_options.row_restriction = f"{f_sql_str}"
# This example leverages Apache Avro.
requested_session.data_format = bigquery_storage.DataFormat.AVRO
parent = "projects/{}".format(self.project_id)
session = client.create_read_session(
request={
"parent": parent,
"read_session": requested_session,
"max_stream_count": int(self.max_parallel_conn),
},
)
self.last_offset = end['offset']
return [RangePartition(session, i) for i in range(len(session.streams))]
def read(self, partition) -> Iterator[List]:
"""
Takes a partition as an input and reads an iterator of tuples from the data source.
"""
from datetime import datetime
session = partition.session
stream_idx = partition.stream_idx
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = self.json_auth_file
client_1 = BigQueryReadClient()
# requested_session.read_options.selected_fields = ["census_tract", "clearance_date", "clearance_status"]
reader = client_1.read_rows(session.streams[stream_idx].name)
reader_iter = []
for message in reader.rows():
reader_iter_in = []
for k, v in message.items():
reader_iter_in.append(v)
# yield(reader_iter)
reader_iter.append(reader_iter_in)
# yield (message['hash'], message['size'], message['virtual_size'], message['version'])
# self.increment_latest_vals.append(max_incr_val)
return iter(reader_iter)
def commit(self, end):
"""
This is invoked when the query has finished processing data before end offset. This
can be used to clean up the resource.
"""
pass
2단계: DataSource 정의
다음으로, 사용자 지정 데이터 원본을 정의합니다. 하위 클래스는 DataSource 다음 메서드를 구현해야 합니다.
name(cls) -> strschema(self) -> Union[StructType, str]
각 메서드에 대한 자세한 내용은 메서드를 참조 하세요.
from pyspark.sql.datasource import DataSource
from pyspark.sql.types import StructType
from google.cloud import bigquery
class BQStreamDataSource(DataSource):
"""
An example data source for streaming data from a public API containing users' comments.
"""
@classmethod
def name(cls):
return "bigquery-streaming"
def schema(self):
type_map = {'integer': 'long', 'float': 'double', 'record': 'string'}
json_auth_file = "/home/" + self.options.get("service_auth_json_file_name")
client = bigquery.Client.from_service_account_json(json_auth_file)
table_ref = self.options.get("project_id") + '.' + self.options.get("dataset") + '.' + self.options.get("table")
table = client.get_table(table_ref)
original_schema = table.schema
result = []
for schema in original_schema:
col_attr_name = schema.name
if (schema.mode != 'REPEATED'):
col_attr_type = type_map.get(schema.field_type.lower(), schema.field_type.lower())
else:
col_attr_type = f"array<{type_map.get(schema.field_type.lower(), schema.field_type.lower())}>"
result.append(col_attr_name + " " + col_attr_type)
return ",".join(result)
# return "census_tract double,clearance_date string,clearance_status string"
def streamReader(self, schema: StructType):
return BQStreamReader(schema, self.options)
3단계: 스트리밍 쿼리 구성 및 시작
마지막으로 커넥터를 등록한 다음 스트리밍 쿼리를 구성하고 시작합니다.
spark.dataSource.register(BQStreamDataSource)
# Ingests table data incrementally using the provided timestamp-based field.
# The latest value is checkpointed using offset semantics.
# Without the incremental input field, full table ingestion is performed.
# Service account JSON files must be available to every Spark executor worker
# in the /home folder using --files /home/<file_name>.json or an init script.
query = (
spark.readStream.format("bigquery-streaming")
.option("project_id", <bq_project_id>)
.option("incremental_checkpoint_field", <table_incremental_ts_based_col>)
.option("dataset", <bq_dataset_name>)
.option("table", <bq_table_name>)
.option("service_auth_json_file_name", <service_account_json_file_name>)
.option("max_parallel_conn", <max_parallel_threads_to_pull_data>) # defaults to max 1000
.load()
)
(
query.writeStream.trigger(processingTime="30 seconds")
.option("checkpointLocation", "checkpoint_path")
.foreachBatch(writeToTable) # your target table write function
.start()
)
실행 순서
사용자 지정 스트림의 펑톤 실행 순서는 아래에 설명되어 있습니다.
Spark 스트림 DataFrame을 로드하는 경우:
name(cls)
schema()
새 쿼리 시작의 마이크로배치(n)의 경우 또는 기존 쿼리를 다시 시작할 때(신규 또는 기존 검사점):
partitions(end_offset, end_offset) # loads the last saved offset from the checkpoint at query restart
latestOffset()
partitions(start_offset, end_offset) # plans partitions and distributes to Python workers
read() # user’s source read definition, runs on each Python worker
commit()
기존 검사점에서 실행 중인 쿼리의 다음(n+1) 마이크로배치의 경우:
latestOffset()
partitions(start_offset, end_offset)
read()
commit()
참고
이 함수는 latestOffset 검사점을 관리합니다. 함수 간에 기본 형식의 검사점 변수를 공유하고 사전으로 반환합니다. 예: return {"offset": str(self.last_offset)}
문제 해결
출력이 다음 오류인 경우 컴퓨팅은 PySpark 사용자 지정 데이터 원본을 지원하지 않습니다. Databricks Runtime 15.2 이상을 사용해야 합니다.
Error: [UNSUPPORTED_FEATURE.PYTHON_DATA_SOURCE] The feature is not supported: Python data sources. SQLSTATE: 0A000