Partager via


DataSourceArrowWriter

Classe de base pour les enregistreurs de sources de données qui traitent les données à l’aide de PyArrow’s RecordBatch.

Contrairement DataSourceWriterà ce qui fonctionne avec un itérateur d’objets Spark Row , cette classe est optimisée pour le format Flèche lors de l’écriture de données. Il peut offrir de meilleures performances lors de l’interfaçage avec des systèmes ou des bibliothèques qui prennent en charge Arrow en mode natif. Implémentez cette classe et retournez une instance à partir de laquelle une source de données peut être accessible en écriture à l’aide de DataSource.writer() Arrow.

Syntaxe

from pyspark.sql.datasource import DataSourceArrowWriter

class MyDataSourceArrowWriter(DataSourceArrowWriter):
    def write(self, iterator):
        ...

Méthodes

Méthode Description
write(iterator) Écrit un itérateur d’objets PyArrow RecordBatch dans le récepteur. Appelé une fois sur chaque exécuteur. Retourne un WriterCommitMessagemessage de validation ou None s’il n’existe aucun message de validation. Cette méthode est abstraite et doit être implémentée.
commit(messages) Valide le travail d’écriture à l’aide d’une liste de messages de validation collectés à partir de tous les exécuteurs. Appelé sur le pilote lorsque toutes les tâches s’exécutent correctement. Hérité de DataSourceWriter.
abort(messages) Abandonne le travail d’écriture à l’aide d’une liste de messages de validation collectés à partir de tous les exécuteurs. Appelé sur le pilote lorsqu’une ou plusieurs tâches ont échoué. Hérité de DataSourceWriter.

Remarques

  • Le pilote collecte les messages de validation de tous les exécuteurs et les transmet si commit() toutes les tâches réussissent ou si abort() une tâche échoue.
  • En cas d’échec d’une tâche d’écriture, son message de validation se trouvera None dans la liste transmise ou commit()abort().

Exemples

Implémentez un enregistreur basé sur des flèches qui compte des lignes sur tous les lots :

from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceArrowWriter, WriterCommitMessage

@dataclass
class MyCommitMessage(WriterCommitMessage):
    num_rows: int

class MyDataSourceArrowWriter(DataSourceArrowWriter):
    def write(self, iterator):
        total_rows = 0
        for batch in iterator:
            total_rows += len(batch)
        return MyCommitMessage(num_rows=total_rows)

    def commit(self, messages):
        total = sum(m.num_rows for m in messages if m is not None)
        print(f"Committed {total} rows")

    def abort(self, messages):
        print("Write job failed, performing cleanup")