Partager via


DataSourceStreamArrowWriter

Classe de base pour les enregistreurs de flux de données qui traitent les données à l’aide de RecordBatchPyArrow.

Contrairement DataSourceStreamWriterà ce qui fonctionne avec un itérateur d’objets Spark Row , cette classe est optimisée pour le format Flèche lors de l’écriture de données de diffusion en continu. Il peut offrir de meilleures performances lors de l’interfaçage avec des systèmes ou des bibliothèques qui prennent en charge Arrow en mode natif pour les cas d’utilisation de streaming. Implémentez cette classe et retournez une instance à partir de laquelle une source de données peut être accessible en écriture en tant que récepteur de diffusion en continu à l’aide de DataSource.streamWriter() Arrow.

Syntaxe

from pyspark.sql.datasource import DataSourceStreamArrowWriter

class MyDataSourceStreamArrowWriter(DataSourceStreamArrowWriter):
    def write(self, iterator):
        ...

Méthodes

Méthode Description
write(iterator) Écrit un itérateur d’objets PyArrow RecordBatch dans le récepteur de streaming. Appelé sur les exécuteurs une fois par microbatch. Retourne un WriterCommitMessagemessage de validation ou None s’il n’existe aucun message de validation. Cette méthode est abstraite et doit être implémentée.
commit(messages, batchId) Valide le microbatch à l’aide d’une liste de messages de validation collectés à partir de tous les exécuteurs. Appelé sur le pilote lorsque toutes les tâches du microbatch s’exécutent correctement. Hérité de DataSourceStreamWriter.
abort(messages, batchId) Abandonne le microbatch à l’aide d’une liste de messages de validation collectés à partir de tous les exécuteurs. Appelé sur le pilote quand une ou plusieurs tâches dans le microbatch ont échoué. Hérité de DataSourceStreamWriter.

Remarques

  • Le pilote collecte les messages de validation de tous les exécuteurs et les transmet si commit() toutes les tâches réussissent ou si abort() une tâche échoue.
  • En cas d’échec d’une tâche d’écriture, son message de validation se trouvera None dans la liste transmise ou commit()abort().
  • batchId identifie de manière unique chaque microbatch et incrémente de 1 avec chaque microbatch traité.

Exemples

Implémentez un enregistreur de flux basé sur les flèches qui compte les lignes par microbatch :

from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceStreamArrowWriter, WriterCommitMessage

@dataclass
class MyCommitMessage(WriterCommitMessage):
    num_rows: int

class MyDataSourceStreamArrowWriter(DataSourceStreamArrowWriter):
    def write(self, iterator):
        total_rows = 0
        for batch in iterator:
            total_rows += len(batch)
        return MyCommitMessage(num_rows=total_rows)

    def commit(self, messages, batchId):
        total = sum(m.num_rows for m in messages if m is not None)
        print(f"Committed batch {batchId} with {total} rows")

    def abort(self, messages, batchId):
        print(f"Batch {batchId} failed, performing cleanup")