DataSourceStreamWriter

Uma classe base para gravadores de fluxo de dados.

Os gravadores de fluxo de dados são responsáveis por gravar dados em um coletor de streaming. Implemente essa classe e retorne uma instância para tornar uma fonte de DataSource.streamWriter() dados gravável como um coletor de streaming. write() é chamado em executores para cada microbatch e commit() ou abort() é chamado no driver depois que todas as tarefas na microbatch são concluídas.

Sintaxe

from pyspark.sql.datasource import DataSourceStreamWriter

class MyDataSourceStreamWriter(DataSourceStreamWriter):
    def write(self, iterator):
        ...

Methods

Método Descrição
write(iterator) Grava dados no coletor de streaming. Chamado em executores uma vez por microbatch. Aceita um iterador de Row objetos e retorna um WriterCommitMessage, ou None se não há nenhuma mensagem de confirmação. Esse método é abstrato e deve ser implementado.
commit(messages, batchId) Confirma a microbatch usando uma lista de mensagens de confirmação coletadas de todos os executores. Invocado no driver quando todas as tarefas na microbatch são executadas com êxito.
abort(messages, batchId) Anula a microbatch usando uma lista de mensagens de confirmação coletadas de todos os executores. Invocado no driver quando uma ou mais tarefas na microbatch falharam.

Observações

  • O driver coleta mensagens de confirmação de todos os executores e as passa para commit() se todas as tarefas tiverem êxito ou se abort() alguma tarefa falhar.
  • Se uma tarefa de gravação falhar, sua mensagem de confirmação estará None na lista passada para commit() ou abort().
  • batchId identifica exclusivamente cada microbatch e incrementa em 1 com cada microbatch processado.

Exemplos

Implemente um gravador de fluxo que acrescente linhas a um arquivo:

from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceStreamWriter, WriterCommitMessage

@dataclass
class MyCommitMessage(WriterCommitMessage):
    num_rows: int

class MyDataSourceStreamWriter(DataSourceStreamWriter):
    def __init__(self, options):
        self.path = options.get("path")

    def write(self, iterator):
        rows = list(iterator)
        with open(self.path, "a") as f:
            for row in rows:
                f.write(str(row) + "\n")
        return MyCommitMessage(num_rows=len(rows))

    def commit(self, messages, batchId):
        total = sum(m.num_rows for m in messages if m is not None)
        print(f"Committed batch {batchId} with {total} rows")

    def abort(self, messages, batchId):
        print(f"Batch {batchId} failed, performing cleanup")