Partager via


DataSourceStreamReader

Classe de base pour les lecteurs de sources de données de streaming.

Les lecteurs de flux de source de données sont responsables de la sortie des données d’une source de données de diffusion en continu. Implémentez cette classe et retournez une instance à partir de laquelle une source de DataSource.streamReader() données est lisible en tant que source de streaming.

Syntaxe

from pyspark.sql.datasource import DataSourceStreamReader

class MyDataSourceStreamReader(DataSourceStreamReader):
    def initialOffset(self):
        ...

    def partitions(self, start, end):
        ...

    def read(self, partition):
        ...

Méthodes

Méthode Description
initialOffset() Retourne le décalage initial de la source de données de diffusion en continu sous forme de dictée. Une nouvelle requête de streaming commence à lire à partir de ce décalage. Les requêtes redémarrées reprendnt à partir du décalage point de contrôle à la place.
partitions(start, end) Retourne une séquence d’objets InputPartition représentant les données entre start et end décalages. Retourne une séquence vide si start elle est endégale.
read(partition) Génère des données pour une partition donnée et retourne un itérateur d’objets tuples, lignes ou PyArrow RecordBatch . Chaque tuple ou ligne est converti en ligne dans le DataFrame final. Cette méthode est abstraite et doit être implémentée.
commit(end) Informe la source que Spark a terminé de traiter toutes les données pour les décalages inférieurs ou égaux à end. Spark demande uniquement des décalages supérieurs end à ceux à venir.
stop() Arrête la source et libère toutes les ressources qu’il a allouées. Appelé lorsque la requête de diffusion en continu se termine.

Remarques

  • read() est statique et sans état. N’accédez pas aux membres de classe mutables ou conservez l’état en mémoire entre différents appels de read().
  • Toutes les valeurs de partition retournées par partitions() doivent être des objets sélectionnables.
  • Les décalages sont représentés sous forme de dictée ou de dictée récursive dont les clés et les valeurs sont des types primitifs : entier, chaîne ou booléen.

Exemples

Implémentez un lecteur de streaming qui lit à partir d’une séquence d’enregistrements indexés :

from pyspark.sql.datasource import (
    DataSource,
    DataSourceStreamReader,
    InputPartition,
)

class MyDataSourceStreamReader(DataSourceStreamReader):
    def initialOffset(self):
        return {"index": 0}

    def latestOffset(self, start, limit):
        return {"index": start["index"] + 10}

    def partitions(self, start, end):
        return [
            InputPartition(i)
            for i in range(start["index"], end["index"])
        ]

    def read(self, partition):
        yield (partition.value, f"record-{partition.value}")

    def commit(self, end):
        print(f"Committed up to offset {end}")

    def stop(self):
        print("Stopping stream reader")