Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Classe de base pour les enregistreurs de sources de données qui traitent les données à l’aide de PyArrow’s RecordBatch.
Contrairement DataSourceWriterà ce qui fonctionne avec un itérateur d’objets Spark Row , cette classe est optimisée pour le format Flèche lors de l’écriture de données. Il peut offrir de meilleures performances lors de l’interfaçage avec des systèmes ou des bibliothèques qui prennent en charge Arrow en mode natif. Implémentez cette classe et retournez une instance à partir de laquelle une source de données peut être accessible en écriture à l’aide de DataSource.writer() Arrow.
Syntaxe
from pyspark.sql.datasource import DataSourceArrowWriter
class MyDataSourceArrowWriter(DataSourceArrowWriter):
def write(self, iterator):
...
Méthodes
| Méthode | Description |
|---|---|
write(iterator) |
Écrit un itérateur d’objets PyArrow RecordBatch dans le récepteur. Appelé une fois sur chaque exécuteur. Retourne un WriterCommitMessagemessage de validation ou None s’il n’existe aucun message de validation. Cette méthode est abstraite et doit être implémentée. |
commit(messages) |
Valide le travail d’écriture à l’aide d’une liste de messages de validation collectés à partir de tous les exécuteurs. Appelé sur le pilote lorsque toutes les tâches s’exécutent correctement. Hérité de DataSourceWriter. |
abort(messages) |
Abandonne le travail d’écriture à l’aide d’une liste de messages de validation collectés à partir de tous les exécuteurs. Appelé sur le pilote lorsqu’une ou plusieurs tâches ont échoué. Hérité de DataSourceWriter. |
Remarques
- Le pilote collecte les messages de validation de tous les exécuteurs et les transmet si
commit()toutes les tâches réussissent ou siabort()une tâche échoue. - En cas d’échec d’une tâche d’écriture, son message de validation se trouvera
Nonedans la liste transmise oucommit()abort().
Exemples
Implémentez un enregistreur basé sur des flèches qui compte des lignes sur tous les lots :
from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceArrowWriter, WriterCommitMessage
@dataclass
class MyCommitMessage(WriterCommitMessage):
num_rows: int
class MyDataSourceArrowWriter(DataSourceArrowWriter):
def write(self, iterator):
total_rows = 0
for batch in iterator:
total_rows += len(batch)
return MyCommitMessage(num_rows=total_rows)
def commit(self, messages):
total = sum(m.num_rows for m in messages if m is not None)
print(f"Committed {total} rows")
def abort(self, messages):
print("Write job failed, performing cleanup")