Udostępnij za pośrednictwem


Niestandardowe źródła danych PySpark

Niestandardowe źródła danych PySpark są tworzone przy użyciu interfejsu API źródła danych języka Python (PySpark), który umożliwia odczytywanie z niestandardowych źródeł danych i zapisywanie w niestandardowych ujściach danych na platformie Apache Spark przy użyciu języka Python. Niestandardowe źródła danych PySpark umożliwiają definiowanie niestandardowych połączeń z systemami danych i implementowanie dodatkowych funkcji w celu tworzenia źródeł danych wielokrotnego użytku.

Uwaga

Niestandardowe źródła danych PySpark wymagają środowiska Databricks Runtime 15.4 LTS lub środowiska bezserwerowego w wersji 2.

Klasa DataSource

PySpark DataSource to klasa bazowa, która udostępnia metody tworzenia czytników danych i składników zapisywania.

Implementowanie podklasy źródła danych

W zależności od przypadku użycia następujące elementy muszą zostać zaimplementowane przez każdą podklasę, aby źródło danych było czytelne, zapisywalne lub oba:

Właściwość lub metoda opis
name Wymagane. Nazwa źródła danych
schema Wymagane. Schemat źródła danych do odczytu lub zapisu
reader() Musi zwrócić wartość , DataSourceReader aby źródło danych było czytelne (wsadowe)
writer() Musi zwrócić wartość , DataSourceWriter aby ujście danych było zapisywalne (wsadowe)
streamReader() lub simpleStreamReader() Musi zwrócić wartość , DataSourceStreamReader aby strumień danych był czytelny (przesyłanie strumieniowe)
streamWriter() Musi zwrócić wartość , DataSourceStreamWriter aby strumień danych był zapisywalny (przesyłanie strumieniowe)

Uwaga

Zdefiniowane przez użytkownika DataSource, DataSourceReader, DataSourceWriter, DataSourceStreamReader i DataSourceStreamWriter oraz ich metody muszą być serializowalne. Innymi słowy, muszą być słownikiem lub zagnieżdżonym słownikiem zawierającym typ pierwotny.

Rejestrowanie źródła danych

Po zaimplementowaniu interfejsu należy go zarejestrować, a następnie załadować go lub użyć go w inny sposób, jak pokazano w poniższym przykładzie:

# Register the data source
spark.dataSource.register(MyDataSourceClass)

# Read from a custom data source
spark.read.format("my_datasource_name").load().show()

Przykład 1. Tworzenie źródła danych PySpark dla zapytania wsadowego

Aby zademonstrować możliwości czytnika źródła danych PySpark, utwórz źródło danych, które generuje przykładowe dane przy użyciu faker pakietu języka Python. Aby uzyskać więcej informacji na temat faker, zobacz dokumentację Faker.

faker Zainstaluj pakiet przy użyciu następującego polecenia:

%pip install faker

Krok 1: Zaimplementuj czytnik dla zapytania wsadowego

Najpierw zaimplementuj logikę czytelnika, aby wygenerować przykładowe dane. Użyj zainstalowanej faker biblioteki, aby wypełnić każde pole w schemacie.

class FakeDataSourceReader(DataSourceReader):

    def __init__(self, schema, options):
        self.schema: StructType = schema
        self.options = options

    def read(self, partition):
        # Library imports must be within the method.
        from faker import Faker
        fake = Faker()

        # Every value in this `self.options` dictionary is a string.
        num_rows = int(self.options.get("numRows", 3))
        for _ in range(num_rows):
            row = []
            for field in self.schema.fields:
                value = getattr(fake, field.name)()
                row.append(value)
            yield tuple(row)

Krok 2. Definiowanie przykładowego źródła danych

Następnie zdefiniuj nowe źródło danych PySpark jako podklasę DataSource o nazwie, schemacie i czytniku. Metoda reader() musi być zdefiniowana, aby odczytać dane ze źródła w zapytaniu wsadowym.

from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType

class FakeDataSource(DataSource):
    """
    An example data source for batch query using the `faker` library.
    """

    @classmethod
    def name(cls):
        return "fake"

    def schema(self):
        return "name string, date string, zipcode string, state string"

    def reader(self, schema: StructType):
        return FakeDataSourceReader(schema, self.options)

Krok 3. Rejestrowanie i używanie przykładowego źródła danych

Aby użyć źródła danych, zarejestruj je. Domyślnie element FakeDataSource ma trzy wiersze, a schemat zawiera następujące string pola: name, , datezipcode, state. Poniższy przykład rejestruje, ładuje i generuje przykładowe źródło danych z wartościami domyślnymi:

spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
+-----------------+----------+-------+----------+
|             name|      date|zipcode|     state|
+-----------------+----------+-------+----------+
|Christine Sampson|1979-04-24|  79766|  Colorado|
|       Shelby Cox|2011-08-05|  24596|   Florida|
|  Amanda Robinson|2019-01-06|  57395|Washington|
+-----------------+----------+-------+----------+

Obsługiwane są tylko string pola, ale można określić schemat z dowolnymi polami odpowiadającymi faker polam dostawców pakietów w celu wygenerowania losowych danych na potrzeby testowania i programowania. Poniższy przykład ładuje źródło danych z polami name i company.

spark.read.format("fake").schema("name string, company string").load().show()
+---------------------+--------------+
|name                 |company       |
+---------------------+--------------+
|Tanner Brennan       |Adams Group   |
|Leslie Maxwell       |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc   |
+---------------------+--------------+

Aby załadować źródło danych z niestandardową liczbą wierszy, określ numRows opcję. W poniższym przykładzie określono 5 wierszy:

spark.read.format("fake").option("numRows", 5).load().show()
+--------------+----------+-------+------------+
|          name|      date|zipcode|       state|
+--------------+----------+-------+------------+
|  Pam Mitchell|1988-10-20|  23788|   Tennessee|
|Melissa Turner|1996-06-14|  30851|      Nevada|
|  Brian Ramsey|2021-08-21|  55277|  Washington|
|  Caitlin Reed|1983-06-22|  89813|Pennsylvania|
| Douglas James|2007-01-18|  46226|     Alabama|
+--------------+----------+-------+------------+

Przykład 2: Tworzenie źródła danych PySpark w usłudze GitHub przy użyciu wariantów

Aby zademonstrować użycie wariantów w źródle danych PySpark, w tym przykładzie jest tworzone źródło danych, które odczytuje żądania ściągnięcia z usługi GitHub.

Uwaga

Warianty są obsługiwane w niestandardowym źródle danych PySpark w środowisku Databricks Runtime 17.1 i nowszym.

Aby uzyskać informacje o wariantach, zobacz Zapytanie dotyczące danych wariantów.

Krok 1: Zaimplementuj moduł do pobierania pull requestów

Najpierw zaimplementuj logikę czytnika, aby pobrać żądania pull z określonego repozytorium GitHub.

class GithubVariantPullRequestReader(DataSourceReader):
    def __init__(self, options):
        self.token = options.get("token")
        self.repo = options.get("path")
        if self.repo is None:
            raise Exception(f"Must specify a repo in `.load()` method.")
        # Every value in this `self.options` dictionary is a string.
        self.num_rows = int(options.get("numRows", 10))

    def read(self, partition):
        header = {
            "Accept": "application/vnd.github+json",
        }
        if self.token is not None:
            header["Authorization"] = f"Bearer {self.token}"
        url = f"https://api.github.com/repos/{self.repo}/pulls"
        response = requests.get(url, headers=header)
        response.raise_for_status()
        prs = response.json()
        for pr in prs[:self.num_rows]:
            yield Row(
                id = pr.get("number"),
                title = pr.get("title"),
                user = VariantVal.parseJson(json.dumps(pr.get("user"))),
                created_at = pr.get("created_at"),
                updated_at = pr.get("updated_at")
            )

Krok 2. Definiowanie źródła danych usługi GitHub

Następnie zdefiniuj nowe źródło danych PySpark w usłudze GitHub jako podklasę DataSource o nazwie, schemacie i metodzie reader(). Schemat zawiera następujące pola: id, , titleuser, created_at, updated_at. Pole user jest definiowane jako wariant.

import json
import requests

from pyspark.sql import Row
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import VariantVal

class GithubVariantDataSource(DataSource):
    @classmethod
    def name(self):
        return "githubVariant"
    def schema(self):
        return "id int, title string, user variant, created_at string, updated_at string"
    def reader(self, schema):
        return GithubVariantPullRequestReader(self.options)

Krok 3. Rejestrowanie i używanie źródła danych

Aby użyć źródła danych, zarejestruj je. Poniższy przykład rejestruje, a następnie wczytuje z źródła danych i generuje trzy wiersze danych z żądań ściągnięcia (PR) w repozytorium GitHub:

spark.dataSource.register(GithubVariantDataSource)
spark.read.format("githubVariant").option("numRows", 3).load("apache/spark").display()
+---------+-----------------------------------------------------+---------------------+----------------------+----------------------+
| id      | title                                               | user                | created_at           | updated_at           |
+---------+---------------------------------------------------- +---------------------+----------------------+----------------------+
|   51293 |[SPARK-52586][SQL] Introduce AnyTimeType             |  {"avatar_url":...} | 2025-06-26T09:20:59Z | 2025-06-26T15:22:39Z |
|   51292 |[WIP][PYTHON] Arrow UDF for aggregation              |  {"avatar_url":...} | 2025-06-26T07:52:27Z | 2025-06-26T07:52:37Z |
|   51290 |[SPARK-50686][SQL] Hash to sort aggregation fallback |  {"avatar_url":...} | 2025-06-26T06:19:58Z | 2025-06-26T06:20:07Z |
+---------+-----------------------------------------------------+---------------------+----------------------+----------------------+

Przykład 3. Tworzenie źródła danych PySpark na potrzeby przesyłania strumieniowego odczytu i zapisu

pl-PL: Aby zademonstrować możliwości czytnika i zapisu strumienia PySpark DataSource, utwórz przykładowe źródło danych, które generuje dwa wiersze w każdej mikropartycji, korzystając z pakietu faker języka Python. Aby uzyskać więcej informacji na temat faker, zobacz dokumentację Faker.

faker Zainstaluj pakiet przy użyciu następującego polecenia:

%pip install faker

Krok 1. Implementowanie czytnika strumienia

Najpierw zaimplementuj przykładowy czytnik danych strumieniowych, który generuje dwa wiersze w każdej mikropartii. Można zaimplementować metodę DataSourceStreamReaderlub jeśli źródło danych ma niską przepływność i nie wymaga partycjonowania, możesz zaimplementować SimpleDataSourceStreamReader zamiast tego. Albo simpleStreamReader(), albo streamReader() musi być zaimplementowany, a simpleStreamReader() jest wywoływany tylko wtedy, gdy streamReader() nie jest zaimplementowany.

Implementacja DataSourceStreamReader

Wystąpienie streamReader ma przesunięcie całkowite, które zwiększa się o 2 w każdej mikropartii, zaimplementowane za pomocą interfejsu DataSourceStreamReader.

from pyspark.sql.datasource import InputPartition
from typing import Iterator, Tuple
import os
import json

class RangePartition(InputPartition):
    def __init__(self, start, end):
        self.start = start
        self.end = end

class FakeStreamReader(DataSourceStreamReader):
    def __init__(self, schema, options):
        self.current = 0

    def initialOffset(self) -> dict:
        """
        Returns the initial start offset of the reader.
        """
        return {"offset": 0}

    def latestOffset(self) -> dict:
        """
        Returns the current latest offset that the next microbatch will read to.
        """
        self.current += 2
        return {"offset": self.current}

    def partitions(self, start: dict, end: dict):
        """
        Plans the partitioning of the current microbatch defined by start and end offset. It
        needs to return a sequence of :class:`InputPartition` objects.
        """
        return [RangePartition(start["offset"], end["offset"])]

    def commit(self, end: dict):
        """
        This is invoked when the query has finished processing data before end offset. This
        can be used to clean up the resource.
        """
        pass

    def read(self, partition) -> Iterator[Tuple]:
        """
        Takes a partition as an input and reads an iterator of tuples from the data source.
        """
        start, end = partition.start, partition.end
        for i in range(start, end):
            yield (i, str(i))

Implementacja SimpleDataSourceStreamReader

Wystąpienie SimpleStreamReader jest takie samo jak wystąpienie FakeStreamReader, które generuje dwa wiersze w każdym przetwarzaniu partii, ale jest zaimplementowane z użyciem interfejsu SimpleDataSourceStreamReader bez partycjonowania.

class SimpleStreamReader(SimpleDataSourceStreamReader):
    def initialOffset(self):
        """
        Returns the initial start offset of the reader.
        """
        return {"offset": 0}

    def read(self, start: dict) -> (Iterator[Tuple], dict):
        """
        Takes start offset as an input, then returns an iterator of tuples and the start offset of the next read.
        """
        start_idx = start["offset"]
        it = iter([(i,) for i in range(start_idx, start_idx + 2)])
        return (it, {"offset": start_idx + 2})

    def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
        """
        Takes start and end offset as inputs, then reads an iterator of data deterministically.
        This is called when the query replays batches during restart or after a failure.
        """
        start_idx = start["offset"]
        end_idx = end["offset"]
        return iter([(i,) for i in range(start_idx, end_idx)])

    def commit(self, end):
        """
        This is invoked when the query has finished processing data before end offset. This can be used to clean up resources.
        """
        pass

Krok 2. Implementowanie modułu zapisywania strumienia

Następnie zaimplementuj zapis strumieniowy. Moduł strumieniowego zapisu danych zapisuje informacje o metadanych każdego mikropakietu do ścieżki lokalnej.

from pyspark.sql.datasource import DataSourceStreamWriter, WriterCommitMessage

class SimpleCommitMessage(WriterCommitMessage):
   def __init__(self, partition_id: int, count: int):
       self.partition_id = partition_id
       self.count = count

class FakeStreamWriter(DataSourceStreamWriter):
   def __init__(self, options):
       self.options = options
       self.path = self.options.get("path")
       assert self.path is not None

   def write(self, iterator):
       """
       Writes the data and then returns the commit message for that partition. Library imports must be within the method.
       """
       from pyspark import TaskContext
       context = TaskContext.get()
       partition_id = context.partitionId()
       cnt = 0
       for row in iterator:
           cnt += 1
       return SimpleCommitMessage(partition_id=partition_id, count=cnt)

   def commit(self, messages, batchId) -> None:
       """
       Receives a sequence of :class:`WriterCommitMessage` when all write tasks have succeeded, then decides what to do with it.
       In this FakeStreamWriter, the metadata of the microbatch(number of rows and partitions) is written into a JSON file inside commit().
       """
       status = dict(num_partitions=len(messages), rows=sum(m.count for m in messages))
       with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
           file.write(json.dumps(status) + "\n")

   def abort(self, messages, batchId) -> None:
       """
       Receives a sequence of :class:`WriterCommitMessage` from successful tasks when some other tasks have failed, then decides what to do with it.
       In this FakeStreamWriter, a failure message is written into a text file inside abort().
       """
       with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
           file.write(f"failed in batch {batchId}")

Krok 3. Definiowanie przykładowego źródła danych

Teraz zdefiniuj nowe źródło danych PySpark jako podklasę DataSource o nazwie, schemacie i metodach streamReader() oraz streamWriter().

from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter
from pyspark.sql.types import StructType

class FakeStreamDataSource(DataSource):
    """
    An example data source for streaming read and write using the `faker` library.
    """

    @classmethod
    def name(cls):
        return "fakestream"

    def schema(self):
        return "name string, state string"

    def streamReader(self, schema: StructType):
        return FakeStreamReader(schema, self.options)

    # If you don't need partitioning, you can implement the simpleStreamReader method instead of streamReader.
    # def simpleStreamReader(self, schema: StructType):
    #    return SimpleStreamReader()

    def streamWriter(self, schema: StructType, overwrite: bool):
        return FakeStreamWriter(self.options)

Krok 4. Rejestrowanie i używanie przykładowego źródła danych

Aby użyć źródła danych, zarejestruj je. Po zarejestrowaniu można go używać w zapytaniach przesyłanych strumieniowo jako źródło lub ujście, przekazując krótką nazwę lub pełną nazwę do format(). Poniższy przykład rejestruje źródło danych, a następnie uruchamia zapytanie, które odczytuje dane z przykładowego źródła i przekazuje je do konsoli.

spark.dataSource.register(FakeStreamDataSource)
query = spark.readStream.format("fakestream").load().writeStream.format("console").start()

Alternatywnie poniższy kod używa przykładowego strumienia jako ujścia i określa ścieżkę wyjściową:

spark.dataSource.register(FakeStreamDataSource)

# Make sure the output directory exists and is writable
output_path = "/output_path"
dbutils.fs.mkdirs(output_path)
checkpoint_path = "/output_path/checkpoint"

query = (
    spark.readStream
    .format("fakestream")
    .load()
    .writeStream
    .format("fakestream")
    .option("path", output_path)
    .option("checkpointLocation", checkpoint_path)
    .start()
)

Przykład 4. Tworzenie łącznika przesyłania strumieniowego Google BigQuery

W poniższym przykładzie pokazano, jak utworzyć niestandardowy łącznik przesyłania strumieniowego dla usługi Google BigQuery (BQ) przy użyciu źródła danych PySpark. Usługa Databricks zapewnia łącznik Spark do pozyskiwania partii danych z BigQuery, a federacja Lakehouse może również zdalnie łączyć się z dowolnym zbiorem danych BigQuery i pobierać dane poprzez tworzenie obcego katalogu, jednak żaden z tych systemów nie obsługuje w pełni przyrostowych ani ciągłych przepływów pracy przesyłania strumieniowego. Ten łącznik umożliwia przyrostową migrację danych fazowo oraz migrację prawie w czasie rzeczywistym z tabel BigQuery zasilanych przez źródła przesyłania strumieniowego z tworzeniem trwałych punktów kontrolnych.

Ten łącznik niestandardowy ma następujące funkcje:

  • Kompatybilny ze streamowaniem strukturalnym i deklaratywnymi potokami Spark Lakeflow.
  • Obsługuje śledzenie przyrostowe rekordów i ciągłe pozyskiwanie danych oraz zachowuje semantykę przesyłania strumieniowego ze strukturą.
  • Używa interfejsu API Storage BigQuery z protokołem opartym na RPC, co umożliwia szybszą i tańszą transmisję danych.
  • Zapisuje zmigrowane tabele bezpośrednio do Unity Catalog.
  • Automatycznie zarządza punktami kontrolnymi przy użyciu pola przyrostowego opartego na dacie lub znaczniku czasu.
  • Obsługuje przetwarzanie wsadowe przy użyciu Trigger.AvailableNow().
  • Nie wymaga pośredniego magazynu w chmurze.
  • Serializuje transmisję danych BigQuery przy użyciu formatu Arrow lub Avro.
  • Obsługuje automatyczne równoleglenie i rozdziela zadania między pracownikami Spark w zależności od wolumenu danych.
  • Nadaje się do migracji warstw nieprzetworzonych i brązowych z BigQuery z obsługą migracji warstw „Silver” i „Gold” przy użyciu wzorców SCD typu 1 lub typu 2.

Wymagania wstępne

Przed zaimplementowaniem łącznika niestandardowego zainstaluj wymagane pakiety:

%pip install faker google.cloud google.cloud.bigquery google.cloud.bigquery_storage

Krok 1. Implementowanie czytnika strumienia

Najpierw zaimplementuj moduł odczytu danych strumieniowych. Podklasa DataSourceStreamReader musi implementować następujące metody:

  • initialOffset(self) -> dict
  • latestOffset(self) -> dict
  • partitions(self, start: dict, end: dict) -> Sequence[InputPartition]
  • read(self, partition: InputPartition) -> Union[Iterator[Tuple], Iterator[Row]]
  • commit(self, end: dict) -> None
  • stop(self) -> None

Aby uzyskać szczegółowe informacje o każdej metodzie, zobacz Metody.

import os
from pyspark.sql.datasource import DataSourceStreamReader, InputPartition
from pyspark.sql.datasource import DataSourceStreamWriter
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql.datasource import DataSource
from pathlib import Path
from pyarrow.lib import TimestampScalar
from datetime import datetime
from typing import Iterator, Tuple, Any, Dict, List, Sequence
from google.cloud.bigquery_storage import BigQueryReadClient, ReadSession
from google.cloud import bigquery_storage
import pandas
import datetime
import uuid
import time, logging

start_time = time.time()


class RangePartition(InputPartition):
    def __init__(self, session: ReadSession, stream_idx: int):
        self.session = session
        self.stream_idx = stream_idx


class BQStreamReader(DataSourceStreamReader):

    def __init__(self, schema, options):
        self.project_id = options.get("project_id")
        self.dataset = options.get("dataset")
        self.table = options.get("table")
        self.json_auth_file = "/home/"+options.get("service_auth_json_file_name")
        self.max_parallel_conn = options.get("max_parallel_conn", 1000)
        self.incremental_checkpoint_field = options.get("incremental_checkpoint_field", "")

        self.last_offset = None

    def initialOffset(self) -> dict:
        """
        Returns the initial start offset of the reader.
        """
        from datetime import datetime
        logging.info("Inside initialOffset!!!!!")
        # self.increment_latest_vals.append(datetime.strptime('1900-01-01 23:57:12', "%Y-%m-%d %H:%M:%S"))
        self.last_offset = '1900-01-01 23:57:12'

        return {"offset": str(self.last_offset)}

    def latestOffset(self):
        """
        Returns the current latest offset that the next microbatch will read to.
        """
        from datetime import datetime
        from google.cloud import bigquery

        if (self.last_offset is None):
            self.last_offset = '1900-01-01 23:57:12'

        client = bigquery.Client.from_service_account_json(self.json_auth_file)
        # max_offset=start["offset"]
        logging.info(f"************************last_offset: {self.last_offset}***********************")
        f_sql_str = ''
        for x_str in self.incremental_checkpoint_field.strip().split(","):
            f_sql_str += f"{x_str}>'{self.last_offset}' or "
        f_sql_str = f_sql_str[:-3]
        job_query = client.query(
            f"select max({self.incremental_checkpoint_field}) from {self.project_id}.{self.dataset}.{self.table} where {f_sql_str}")
        for query in job_query.result():
            max_res = query[0]

        if (str(max_res).lower() != 'none'):
            return {"offset": str(max_res)}

        return {"offset": str(self.last_offset)}

    def partitions(self, start: dict, end: dict) -> Sequence[InputPartition]:

        """
        Plans the partitioning of the current microbatch defined by start and end offset. It
        needs to return a sequence of :class:`InputPartition` objects.
        """
        if (self.last_offset is None):
            self.last_offset = end['offset']

        os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = self.json_auth_file

        # project_id = self.auth_project_id

        client = BigQueryReadClient()

        # This example reads baby name data from the public datasets.
        table = "projects/{}/datasets/{}/tables/{}".format(
            self.project_id, self.dataset, self.table
        )
        requested_session = bigquery_storage.ReadSession()
        requested_session.table = table
        if (self.incremental_checkpoint_field != ''):
            start_offset = start["offset"]
            end_offset = end["offset"]
            f_sql_str = ''
            for x_str in self.incremental_checkpoint_field.strip().split(","):
                f_sql_str += f"({x_str}>'{start_offset}' and {x_str}<='{end_offset}') or "
            f_sql_str = f_sql_str[:-3]
            requested_session.read_options.row_restriction = f"{f_sql_str}"

        # This example leverages Apache Avro.
        requested_session.data_format = bigquery_storage.DataFormat.AVRO

        parent = "projects/{}".format(self.project_id)
        session = client.create_read_session(
            request={
                "parent": parent,
                "read_session": requested_session,
                "max_stream_count": int(self.max_parallel_conn),
            },
        )
        self.last_offset = end['offset']
        return [RangePartition(session, i) for i in range(len(session.streams))]

    def read(self, partition) -> Iterator[List]:
        """
        Takes a partition as an input and reads an iterator of tuples from the data source.
        """
        from datetime import datetime
        session = partition.session
        stream_idx = partition.stream_idx
        os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = self.json_auth_file
        client_1 = BigQueryReadClient()
        # requested_session.read_options.selected_fields = ["census_tract", "clearance_date", "clearance_status"]
        reader = client_1.read_rows(session.streams[stream_idx].name)
        reader_iter = []

        for message in reader.rows():
            reader_iter_in = []
            for k, v in message.items():
                reader_iter_in.append(v)
            # yield(reader_iter)
            reader_iter.append(reader_iter_in)
            # yield (message['hash'], message['size'], message['virtual_size'], message['version'])
        # self.increment_latest_vals.append(max_incr_val)
        return iter(reader_iter)

    def commit(self, end):

        """
        This is invoked when the query has finished processing data before end offset. This
        can be used to clean up the resource.
        """
        pass

Krok 2. Definiowanie źródła danych

Następnie zdefiniuj niestandardowe źródło danych. Podklasa DataSource musi implementować następujące metody:

  • name(cls) -> str
  • schema(self) -> Union[StructType, str]

Aby uzyskać szczegółowe informacje o każdej metodzie, zobacz Metody.

from pyspark.sql.datasource import DataSource
from pyspark.sql.types import StructType
from google.cloud import bigquery

class BQStreamDataSource(DataSource):
    """
    An example data source for streaming data from a public API containing users' comments.
    """

    @classmethod
    def name(cls):
        return "bigquery-streaming"

    def schema(self):
        type_map = {'integer': 'long', 'float': 'double', 'record': 'string'}
        json_auth_file = "/home/" + self.options.get("service_auth_json_file_name")
        client = bigquery.Client.from_service_account_json(json_auth_file)
        table_ref = self.options.get("project_id") + '.' + self.options.get("dataset") + '.' + self.options.get("table")
        table = client.get_table(table_ref)
        original_schema = table.schema
        result = []
        for schema in original_schema:
            col_attr_name = schema.name
            if (schema.mode != 'REPEATED'):
                col_attr_type = type_map.get(schema.field_type.lower(), schema.field_type.lower())
            else:
                col_attr_type = f"array<{type_map.get(schema.field_type.lower(), schema.field_type.lower())}>"
            result.append(col_attr_name + " " + col_attr_type)

        return ",".join(result)
        # return "census_tract double,clearance_date string,clearance_status string"

    def streamReader(self, schema: StructType):
        return BQStreamReader(schema, self.options)

Krok 3: Skonfiguruj i uruchom zapytanie strumieniowe

Na koniec zarejestruj łącznik, a następnie skonfiguruj i uruchom zapytanie przesyłania strumieniowego:

spark.dataSource.register(BQStreamDataSource)

# Ingests table data incrementally using the provided timestamp-based field.
# The latest value is checkpointed using offset semantics.
# Without the incremental input field, full table ingestion is performed.
# Service account JSON files must be available to every Spark executor worker
# in the /home folder using --files /home/<file_name>.json or an init script.

query = (
    spark.readStream.format("bigquery-streaming")
    .option("project_id", <bq_project_id>)
    .option("incremental_checkpoint_field", <table_incremental_ts_based_col>)
    .option("dataset", <bq_dataset_name>)
    .option("table", <bq_table_name>)
    .option("service_auth_json_file_name", <service_account_json_file_name>)
    .option("max_parallel_conn", <max_parallel_threads_to_pull_data>)  # defaults to max 1000
    .load()
)

(
    query.writeStream.trigger(processingTime="30 seconds")
    .option("checkpointLocation", "checkpoint_path")
    .foreachBatch(writeToTable)  # your target table write function
    .start()
)

Kolejność wykonywania

Kolejność wykonywania funkcji strumienia niestandardowego została opisana poniżej.

Aby załadować ramkę danych strumienia Spark:

name(cls)
schema()

W przypadku mikropartii (n) nowego uruchomienia zapytania lub ponownego uruchomienia istniejącego zapytania (z nowym lub istniejącym punktem kontrolnym):

partitions(end_offset, end_offset)  # loads the last saved offset from the checkpoint at query restart
latestOffset()
partitions(start_offset, end_offset)  # plans partitions and distributes to Python workers
read()  # user’s source read definition, runs on each Python worker
commit()

Dla następnej (n+1) mikropartii działającej kwerendy przy istniejącym punkcie kontrolnym:

latestOffset()
partitions(start_offset, end_offset)
read()
commit()

Uwaga

Funkcja latestOffset organizuje punkty kontrolne. Udostępnianie zmiennej punktu kontrolnego typu pierwotnego między funkcjami i zwracanie jej jako słownika. Przykład: return {"offset": str(self.last_offset)}

Rozwiązywanie problemów

Jeśli dane wyjściowe są następujące, Twoje środowisko obliczeniowe nie obsługuje niestandardowych źródeł danych PySpark. Musisz użyć środowiska Databricks Runtime 15.2 lub nowszego.

Error: [UNSUPPORTED_FEATURE.PYTHON_DATA_SOURCE] The feature is not supported: Python data sources. SQLSTATE: 0A000