Partage via


DataSourceStreamWriter

Classe de base pour les enregistreurs de flux de données.

Les enregistreurs de flux de données sont responsables de l’écriture de données dans un récepteur de streaming. Implémentez cette classe et retournez une instance à partir de laquelle une source de DataSource.streamWriter() données peut être accessible en écriture en tant que récepteur de diffusion en continu. write() est appelé sur les exécuteurs pour chaque microbatch et commit() est abort() appelé sur le pilote une fois toutes les tâches du microbatch terminées.

Syntaxe

from pyspark.sql.datasource import DataSourceStreamWriter

class MyDataSourceStreamWriter(DataSourceStreamWriter):
    def write(self, iterator):
        ...

Méthodes

Méthode Description
write(iterator) Écrit des données dans le récepteur de streaming. Appelé sur les exécuteurs une fois par microbatch. Accepte un itérateur d’objets Row et retourne un WriterCommitMessagemessage de validation ou None s’il n’existe aucun message de validation. Cette méthode est abstraite et doit être implémentée.
commit(messages, batchId) Valide le microbatch à l’aide d’une liste de messages de validation collectés à partir de tous les exécuteurs. Appelé sur le pilote lorsque toutes les tâches du microbatch s’exécutent correctement.
abort(messages, batchId) Abandonne le microbatch à l’aide d’une liste de messages de validation collectés à partir de tous les exécuteurs. Appelé sur le pilote quand une ou plusieurs tâches dans le microbatch ont échoué.

Remarques

  • Le pilote collecte les messages de validation de tous les exécuteurs et les transmet si commit() toutes les tâches réussissent ou si abort() une tâche échoue.
  • En cas d’échec d’une tâche d’écriture, son message de validation se trouvera None dans la liste transmise ou commit()abort().
  • batchId identifie de manière unique chaque microbatch et incrémente de 1 avec chaque microbatch traité.

Exemples

Implémentez un enregistreur de flux qui ajoute des lignes à un fichier :

from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceStreamWriter, WriterCommitMessage

@dataclass
class MyCommitMessage(WriterCommitMessage):
    num_rows: int

class MyDataSourceStreamWriter(DataSourceStreamWriter):
    def __init__(self, options):
        self.path = options.get("path")

    def write(self, iterator):
        rows = list(iterator)
        with open(self.path, "a") as f:
            for row in rows:
                f.write(str(row) + "\n")
        return MyCommitMessage(num_rows=len(rows))

    def commit(self, messages, batchId):
        total = sum(m.num_rows for m in messages if m is not None)
        print(f"Committed batch {batchId} with {total} rows")

    def abort(self, messages, batchId):
        print(f"Batch {batchId} failed, performing cleanup")