Partilhar via


Modo em tempo real no Structured Streaming

Important

Este recurso está no Public Preview.

Esta página descreve o modo em tempo real, um tipo de gatilho para Streaming Estruturado que permite o processamento de dados de latência ultrabaixa com latência de ponta a ponta tão baixa quanto 5 ms. Esse modo foi projetado para cargas de trabalho operacionais que exigem resposta imediata a dados de streaming.

O modo em tempo real está disponível no Databricks Runtime 16.4 LTS e versões posteriores.

Cargas de trabalho operacionais

As cargas de trabalho de streaming podem ser amplamente divididas em cargas de trabalho analíticas e cargas de trabalho operacionais:

  • Trabalhos analíticos usam o carregamento e a transformação de dados, normalmente seguindo a arquitetura medalhão (por exemplo, carregamento de dados nas tabelas bronze, prata e ouro).
  • As cargas de trabalho operacionais consomem dados em tempo real, aplicam a lógica de negócios e acionam ações ou decisões downstream.

Alguns exemplos de cargas de trabalho operacionais são:

  • Bloquear ou sinalizar uma transação de cartão de crédito em tempo real se uma pontuação de fraude exceder um limite, com base em fatores como localização incomum, grande tamanho da transação ou padrões de gastos rápidos.
  • Enviar uma mensagem promocional quando os dados de fluxo de cliques indicam que um utilizador está a navegar por jeans há cinco minutos, oferecendo um desconto de 25% se o fizer nos próximos 15 minutos.

Em geral, as cargas de trabalho operacionais são caracterizadas pela necessidade de latência de ponta a ponta abaixo do segundo. Isso pode ser alcançado com o modo em tempo real no Apache Spark Structured Streaming.

Como o modo em tempo real alcança baixa latência

O modo em tempo real melhora a arquitetura de execução ao:

  • Execução de lotes de longa duração (o padrão é 5 minutos), nos quais os dados são processados à medida que ficam disponíveis na fonte.
  • Todas as etapas da consulta são agendadas simultaneamente. Isso requer que o número de slots de tarefas disponíveis seja igual ou maior do que o número de tarefas de todos os estágios de um lote.
  • Os dados são transferidos entre etapas assim que são produzidos, utilizando um shuffle em streaming.

No final do processamento de um lote, e antes do próximo lote começar, os pontos de verificação do Streaming Estruturado progridem e disponibilizam métricas para o último lote. Se os lotes forem mais longos, essas atividades podem ser menos frequentes, levando a repetições mais longas em caso de falha e atraso na disponibilidade das métricas. Por outro lado, se os lotes forem menores, essas atividades se tornam mais frequentes, potencialmente afetando a latência. O Databricks recomenda comparar o modo em tempo real com a carga de trabalho e os requisitos de destino para encontrar o intervalo de gatilho apropriado.

Configuração do cluster

Para usar o modo em tempo real no Structured Streaming, você deve configurar um trabalho Lakeflow clássico:

  1. No seu espaço de trabalho do Azure Databricks, clique em Novo no canto superior esquerdo. Escolha Mais e clique em Cluster.

  2. Clara aceleração de fótons.

  3. Deselecione Ativar dimensionamento automático.

  4. Em Desempenho avançado, desmarque Usar instâncias spot.

  5. Em Modo Avançado e Acesso, clique em Manual e selecione Dedicado (anteriormente: Usuário único).

  6. Em Spark, digite o seguinte em Configuração do Spark:

    spark.databricks.streaming.realTimeMode.enabled true
    
  7. Clique em Criar.

Requisitos de tamanho do cluster

Você pode executar um trabalho em tempo real por cluster se o cluster tiver slots de tarefas suficientes.

Para ser executado no modo de baixa latência, o número total de slots de tarefas disponíveis deve ser maior ou igual ao número de tarefas em todos os estágios de consulta.

Exemplos de cálculo de faixas horárias

Gasoduto sem estado de estágio único (fonte Kafka + sumidouro):

Se maxPartitions = 8, você precisa de pelo menos 8 slots. Se maxPartitions não estiver definido, use o número de partições de tópico Kafka.

Gasoduto stateful de dois estágios (fonte Kafka + shuffle):

Se maxPartitions = 8 e shuffle partitions = 20, você precisa de 8 + 20 = 28 slots.

Canalização de três estágios (fonte Kafka + mistura + redistribuição)

Com maxPartitions = 8 e dois estágios de shuffle de 20 cada, você precisa de 8 + 20 + 20 = 48 slots.

Principais considerações

Ao configurar o cluster, leve isso em consideração:

  • Ao contrário do modo de microlote, as tarefas em tempo real podem ficar ociosas enquanto aguardam dados, portanto, o dimensionamento correto é essencial para evitar o desperdício de recursos.
  • Visar um nível de utilização alvo (por exemplo, 50%) ajustando:
    • maxPartitions (para Kafka)
    • spark.sql.shuffle.partitions (para fases aleatórias)
  • A Databricks recomenda definir o maxPartitions para que cada tarefa lide com várias partições Kafka, reduzindo a sobrecarga.
  • Ajuste os slots de tarefas por trabalhador para corresponder à carga de trabalho para trabalhos simples de um estágio.
  • Para trabalhos com muita reorganização, experimente encontrar o número mínimo de partições de reorganização que evitem acumulações e ajuste a partir daí. O trabalho não será agendado se o cluster não tiver slots suficientes.

Note

A partir do Databricks Runtime 16.4 LTS e posteriores, todos os pipelines em tempo real utilizam checkpoint v2, o que permite uma alternância fluida entre modos em tempo real e micro-batch.

Configuração da consulta

Você deve habilitar o gatilho em tempo real para especificar que uma consulta deve ser executada usando o modo de baixa latência. Além disso, os gatilhos em tempo real são suportados apenas no modo de atualização. Por exemplo:

Python

query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .outputMode("update")
        # in PySpark, realTime trigger requires you to specify the interval.
        .trigger(realTime="5 minutes")
        .start()
)

Scala

import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val readStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic).load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .outputMode("update")
      .trigger(RealTimeTrigger.apply())
      // RealTimeTrigger can also accept an argument specifying the checkpoint interval.
      // For example, this code indicates a checkpoint interval of 5 minutes:
      // .trigger(RealTimeTrigger.apply("5 minutes"))
      .start()

Observability

Anteriormente, a latência da consulta de ponta a ponta estava intimamente ligada à duração do lote, tornando a duração do lote um bom indicador de latência da consulta. No entanto, este método já não se aplica em tempo real, exigindo abordagens alternativas para medir a latência. A latência de ponta a ponta é específica da carga de trabalho e, às vezes, só pode ser medida com precisão com a lógica de negócios. Por exemplo, se o carimbo de data/hora de origem for gerado em Kafka, a latência pode ser calculada como a diferença entre o carimbo de data/hora de saída de Kafka e o carimbo de data/hora de origem.

Você pode estimar a latência de ponta a ponta de várias maneiras com base em informações parciais coletadas durante o processo de streaming.

Usar StreamingQueryProgress

As métricas a seguir são incluídas no StreamingQueryProgress evento, que é registrado automaticamente nos logs do driver. Você também pode acessá-los através da função de retorno do StreamingQueryListener de onQueryProgress(). QueryProgressEvent.json() ou toString() incluem métricas extras do modo em tempo real.

  1. Latência de processamento (processingLatencyMs). O tempo decorrido entre o momento em que a consulta em tempo real lê um registo e até ser gravado na fase seguinte ou numa etapa subsequente. Para consultas de estágio único, isso mede a mesma duração que a latência E2E. Essa métrica é relatada por tarefa.
  2. Latência de enfileiramento da fonte (sourceQueuingLatencyMs). O tempo decorrido entre o momento em que um registo é gravado com êxito num barramento de mensagens, por exemplo, o tempo de anexação ao log no Kafka, e o momento em que o registo foi lido pela primeira vez pela consulta em modo de tempo real. Essa métrica é relatada por tarefa.
  3. Latência E2E (e2eLatencyMs). O tempo entre o momento em que o registo é gravado com êxito num barramento de mensagens e quando o registo é transferido a jusante pela consulta em modo de tempo real. Essa métrica é agregada por lote em todos os registros processados por todas as tarefas.

Por exemplo:

"rtmMetrics" : {
    "processingLatencyMs" : {
      "P0" : 0,
      "P50" : 0,
      "P90" : 0,
      "P95" : 0,
      "P99" : 0
    },
    "sourceQueuingLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 3
    },
    "e2eLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 4
    },

Usar a Observe API em tarefas

A API do Observe ajuda a medir a latência sem iniciar outro trabalho. Se tiveres um carimbo de data/hora de origem que se aproxima do momento de chegada dos dados de origem e ele for transmitido antes de atingir o coletor, ou se conseguires encontrar uma forma de transmitir o carimbo de data/hora, podes estimar a latência de cada lote utilizando a API Observe:

Python

from datetime import datetime

from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType

@udf(returnType=TimestampType())
def current_timestamp():
  return datetime.now()

# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
  "latency",
  unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  "observedLatency",
  avg(col("latency")).alias("avg"),
  max(col("latency")).alias("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.

Scala

import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}

val currentTimestampUDF = udf(() => System.currentTimeMillis())

// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
  "latency",
  col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  name = "observedLatency",
  avg(col("latency")).as("avg"),
  max(col("latency")).as("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.

Neste exemplo, um carimbo de data/hora atual é registrado antes de enviar a entrada, e a latência é estimada calculando a diferença entre esse carimbo de data/hora e o carimbo de data/hora de origem do registro. Os resultados são incluídos em relatórios de progresso e disponibilizados aos ouvintes. Aqui está um exemplo de resultado:

"observedMetrics" : {
  "observedLatency" : {
    "avg" : 63.8369765176552,
    "max" : 219,
    "p99" : 154,
    "p50" : 49
  }
}

O que é suportado?

Environments

Tipo de cluster Supported
Dedicado (anteriormente: usuário único) Yes
Padrão (anteriormente: compartilhado) No
Lakeflow Spark Oleodutos Declarativos Classic No
Lakeflow Spark Pipelines Declarativos Sem Servidor No
Serverless No

Languages

Linguagem Supported
Scala Yes
Java Yes
Python Yes

Modos de execução

Modo de execução Supported
Modo de atualização Yes
modo de acrescento No
Modo completo No

Sources

Sources Supported
Apache Kafka Yes
AWS MSK Yes
Eventhub (usando o Kafka Connector) Yes
Kinesis Sim (apenas modo EFO)
Google Pub/Sub No
Apache Pulsar No

Sinks

Sinks Supported
Apache Kafka Yes
Eventhub (usando o Kafka Connector) Yes
Kinesis No
Google Pub/Sub No
Apache Pulsar No
Sumidouros arbitrários (usando forEachWriter) Yes

Operators

Operators Supported
Operações sem Estado
  • Selection
Yes
  • Projection
Yes
UDFs
  • Scala UDF
Sim (com algumas limitações)
  • Python UDF
Sim (com algumas limitações)
Aggregation
  • sum
Yes
  • count
Yes
  • max
Yes
  • min
Yes
  • avg
Yes
Funções de agregação Yes
Windowing
  • Tumbling
Yes
  • Sliding
Yes
  • Session
No
Deduplication
  • dropDuplicados
Sim (o estado é ilimitado)
  • dropDuplicatesWithinWatermark
No
Stream - Join de Tabela
  • Tabela de difusão (deverá ser pequena)
Yes
Stream - Stream Junte-se No
(plano)MapGroupsWithState No
transformWithState Sim (com algumas diferenças)
união Sim (com algumas limitações)
paraCada Yes
paraEachBatch No
mapPartições Não (ver limitação)

Utilize transformWithState no modo em tempo real

Para a criação de aplicativos baseados em estado personalizados, o Databricks suporta transformWithState, uma API no Apache Spark Structured Streaming. Consulte Criar um aplicativo com monitoração de estado personalizado para obter mais informações sobre a API e trechos de código.

No entanto, há algumas diferenças entre como a API se comporta no modo de tempo real e as consultas de streaming tradicionais que aproveitam a arquitetura de microlote.

  • O método em tempo handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues) real é chamado para cada linha.
    • O inputRows iterador retorna um único valor. No modo de microlote, ele é chamado uma vez para cada chave, e o inputRows iterador retorna todos os valores para uma chave no microlote.
    • Você deve estar ciente dessa diferença ao escrever seu código.
  • Os temporizadores de tempo de evento não são suportados no modo de tempo real.
  • No modo de tempo real, os temporizadores demoram a disparar dependendo da chegada dos dados. Caso contrário, se não houver dados, eles serão acionados no final do lote de longa execução. Por exemplo, se um temporizador deve disparar às 10:00:00 e não há chegada de dados simultaneamente, ele não é acionado. Em vez disso, se os dados chegarem às 10:00:10, o temporizador é acionado com um atraso de 10 segundos. Ou, se nenhum dado chegar e o lote de longa execução estiver sendo encerrado, ele executará o temporizador antes de encerrar o lote de longa execução.

Funções Definidas pelo Utilizador (UDFs) em Python

O Databricks suporta a maioria das funções definidas pelo usuário (UDFs) do Python no modo em tempo real:

Tipo UDF Supported
UDF apátrida
  • UDF escalar Python (link)
Yes
  • Seta escalar UDF
Yes
  • Pandas escalar UDF (link)
Yes
  • Função de seta (mapInArrow)
Yes
  • Função Pandas (link)
Yes
UDF (Agrupamento de Estado) UDF (UDAF)
  • transformWithState (NOTA: apenas Row interface)
Yes
  • applyInPandasWithState
No
UDF de agrupamento sem estado (UDAF)
  • apply
No
  • applyInArrow
No
  • applyInPandas
No
Função Tabela
No
UC UDF No

Há vários pontos a considerar ao usar UDFs Python no modo de tempo real:

  • Para minimizar a latência, configure o tamanho do lote de seta (spark.sql.execution.arrow.maxRecordsPerBatch) como 1.
    • Compensação: essa configuração otimiza a latência em detrimento da taxa de transferência. Para a maioria das cargas de trabalho, essa configuração é recomendada.
    • Aumente o tamanho do lote somente se uma taxa de transferência maior for necessária para acomodar o volume de entrada, aceitando o potencial aumento na latência.
  • Pandas UDFs e funções não funcionam bem com um tamanho de lote de seta de 1.
    • Se você usar pandas UDFs ou funções, defina o tamanho do lote de seta para um valor mais alto (por exemplo, 100 ou superior).
    • Observe que isso implica maior latência. O Databricks recomenda o uso da Seta UDF ou da função, se possível.
  • Devido ao problema de desempenho com pandas, transformWithState só é suportado com a Row interface.

Técnicas de otimização

Technique Ativado por padrão
pt-PT: Acompanhamento assíncrono do progresso: Move a escrita para o log de offset e o log de confirmação para um processo assíncrono, reduzindo o tempo inter-batch entre dois micro-lotes. Isto pode ajudar a reduzir a latência das consultas de streaming sem estado. No
Checkpointing de estado assíncrono: Ajuda a reduzir a latência das consultas de streaming com estado ao começar a processar o próximo micro-batch assim que o cálculo do micro-batch anterior é concluído, sem esperar pelo checkpointing de estado. No

Limitations

Limitação da fonte

Para o Kinesis, o modo de sondagem não é suportado. Além disso, as repartições frequentes podem afetar negativamente a latência.

Limitação da União

Para a União, existem algumas limitações:

  • A auto-união não é suportada:
    • Kafka: Não é possível usar o mesmo objeto de quadro de dados de origem e unir quadros de dados derivados dele. Solução alternativa: utilize diferentes Dataframes que leiam da mesma fonte.
    • Kinesis: Não é possível unir quadros de dados derivados da mesma fonte do Kinesis com a mesma configuração. Solução alternativa: Além de usar Dataframes diferentes, pode atribuir uma opção diferente de 'NomeConsumidor' a cada DataFrame.
  • Os operadores estatais (por exemplo, aggregate, , deduplicatetransformWithState) definidos antes da União não são suportados.
  • Não há suporte para a junção com fontes de lote.

Limitação do MapPartitions

mapPartitions em Scala e APIs semelhantes em Python (mapInPandas, mapInArrow) recebem um iterador da partição inteira de entrada e produzem um iterador da saída inteira com mapeamento arbitrário entre entrada e saída. Estas APIs podem causar problemas de desempenho no Modo Real-Time de Streaming ao bloquear toda a saída, o que aumenta a latência. A semântica destas APIs não suporta bem a propagação de metadados.

Use UDFs escalares combinados com Transform tipos de dados complexos ou filter para obter funcionalidades semelhantes.

Examples

Os exemplos abaixo mostram consultas suportadas.

Consultas sem estado

Todas as consultas sem estado de estágio único ou múltiplo são suportadas.

Fonte de Kafka para destino de Kafka

Neste exemplo, você lê a partir de uma fonte Kafka e escreve em uma pia Kafka.

Python
query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("startingOffsets", "earliest")
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .trigger(realTime="5 minutes")
        .outputMode("update")
        .start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Repartição

Neste exemplo, você lê de uma fonte Kafka, reparticiona os dados em 20 partições e grava em um coletor Kafka.

Define a configuração spark.sql.execution.sortBeforeRepartition do Spark para false antes de usar a repartição.

Python
# Sorting is not supported in repartition with real-time mode, so this has to be set to false to achieve low latency.
spark.conf.set("spark.sql.execution.sortBeforeRepartition", "false")

query = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("subscribe", input_topic)
    .option("startingOffsets", "earliest")
    .load()
    .repartition(20)
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

// Sorting is not supported in repartition with real-time mode, so this has to be set to false to achieve low latency.
spark.conf.set("spark.sql.execution.sortBeforeRepartition", "false")

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .repartition(20)
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Junção de fluxo-instantâneo (transmissão apenas)

Neste exemplo, você lê de Kafka, une os dados com uma tabela estática e grava em um coletor de Kafka. Observe que apenas junções estáticas de fluxo que transmitem a tabela estática são suportadas, o que significa que a tabela estática deve caber na memória.

Python
from pyspark.sql.functions import broadcast, expr

# We assume the static table in the path `stateic_table_location` has a column 'lookupKey'.

query = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("subscribe", input_topic)
    .option("startingOffsets", "earliest")
    .load()
    .withColumn("joinKey", expr("CAST(value AS STRING)"))
    .join(
        broadcast(spark.read.format("parquet").load(static_table_location)),
        expr("joinKey = lookupKey")
    )
    .selectExpr("value AS key", "value")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.functions.{broadcast, expr}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .join(broadcast(spark.read.format("parquet").load(staticTableLocation)), expr("joinKey = lookupKey"))
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Fonte Kinesis para o lavatório de Kafka

Neste exemplo, você lê a partir de uma fonte Kinesis e escreve em um coletor Kafka.

Python
query = (
    spark.readStream
        .format("kinesis")
        .option("region", region_name)
        .option("awsAccessKey", aws_access_key_id)
        .option("awsSecretKey", aws_secret_access_key)
        .option("consumerMode", "efo")
        .option("consumerName", consumer_name)
        .load()
        .selectExpr("parttitionKey AS key", "CAST(data AS STRING) AS value")
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .trigger(realTime="5 minutes")
        .outputMode("update")
        .start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.readStream
      .format("kinesis")
      .option("region", regionName)
      .option("awsAccessKey", awsAccessKeyId)
      .option("awsSecretKey", awsSecretAccessKey)
      .option("consumerMode", "efo")
      .option("consumerName", consumerName)
      .load()
      .select(
        col("partitionKey").alias("key"),
        col("data").cast("string").alias("value")
      )
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Union

Neste exemplo, você une dois DataFrames Kafka de dois tópicos diferentes e grava em um coletor Kafka.

Python
df1 = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic_1)
    .load()
)

df2 = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic_2)
    .load()
)

query = (
    df1.union(df2)
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val df1 = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic1)
      .load()

val df2 = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic2)
      .load()

df1.union(df2)
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Consultas com estado

Deduplication

Python
query = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic)
    .load()
    .dropDuplicates(["timestamp", "value"])
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .dropDuplicates("timestamp", "value")
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Aggregation

Python
from pyspark.sql.functions import col

query = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic)
    .load()
    .groupBy(col("timestamp"), col("value"))
    .count()
    .selectExpr("CAST(value AS STRING) AS key", "CAST(count AS STRING) AS value")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .groupBy(col("timestamp"), col("value"))
      .count()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply("5 minutes"))
      .outputMode(OutputMode.Update())
      .start()

União com Agregação

Neste exemplo, você primeiro une dois Kafka DataFrames de dois tópicos diferentes e, em seguida, faz uma agregação. No final, você escreve para a pia de Kafka.

Python
from pyspark.sql.functions import col

df1 = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic_1)
    .load()
)

df2 = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic_2)
    .load()
)

query = (
    df1.union(df2)
    .groupBy(col("timestamp"), col("value"))
    .count()
    .selectExpr("CAST(value AS STRING) AS key", "CAST(count AS STRING) AS value")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val df1 = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic1)
      .load()

val df2 = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic2)
      .load()

df1.union(df2)
      .groupBy(col("timestamp"), col("value"))
      .count()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

TransformWithState

import org.apache.spark.sql.Encoders
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
import org.apache.spark.sql.streaming.{ListState, MapState, StatefulProcessor, OutputMode, TTLConfig, TimeMode, TimerValues, ValueState}

/**
 * This processor counts the number of records it has seen for each key using state variables
 * with TTLs. It redundantly maintains this count with a value, list, and map state to put load
 * on the state variable cleanup mechanism. (In practice, only one value state is needed to maintain
 * the count for a given grouping key.)
 *
 * The input schema it expects is (String, Long) which represents a (key, source-timestamp) tuple.
 * The source-timestamp is passed through so that we can calculate end-to-end latency. The output
 * schema is (String, Long, Long), which represents a (key, count, source-timestamp) 3-tuple.
 *
 */

class RTMStatefulProcessor(ttlConfig: TTLConfig)
  extends StatefulProcessor[String, (String, Long), (String, Long, Long)] {
  @transient private var _value: ValueState[Long] = _
  @transient private var _map: MapState[Long, String] = _
  @transient private var _list: ListState[String] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    // Counts the number of records this key has seen
    _value = getHandle.getValueState("value", Encoders.scalaLong, ttlConfig)
    _map = getHandle.getMapState("map", Encoders.scalaLong, Encoders.STRING, ttlConfig)
    _list = getHandle.getListState("list", Encoders.STRING, ttlConfig)
  }

  override def handleInputRows(
      key: String,
      inputRows: Iterator[(String, Long)],
      timerValues: TimerValues): Iterator[(String, Long, Long)] = {
    inputRows.map { row =>
      val key = row._1
      val sourceTimestamp = row._2

      val oldValue = _value.get()
      _value.update(oldValue + 1)
      _map.updateValue(oldValue, key)
      _list.appendValue(key)

      (key, oldValue + 1, sourceTimestamp)
    }
  }
}

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .select(col("key").cast("STRING"), col("value").cast("STRING"), col("timestamp"))
      .as[(String, String, Timestamp)]
      .groupByKey(row => row._1)
      .transformWithState(new RTMStatefulProcessor(TTLConfig(Duration.ofSeconds(30))), TimeMode.ProcessingTime, OutputMode.Update)
      .as[(String, Long, Long)]
      .select(
            col("_1").as("key"),
            col("_2").as("value")
      )
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply("5 minutes"))
      .outputMode(OutputMode.Update())
      .start()

Note

Há uma diferença entre como o modo em tempo real e outros modos de execução no Structured Streaming executam o StatefulProcessor in transformWithState. Veja Usar transformWithState no modo de tempo real

TransformWithState (PySpark, interface de linha)

from typing import Iterator, Tuple

from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import LongType, StringType, TimestampType, StructField, StructType


class RTMStatefulProcessor(StatefulProcessor):
  """
  This processor counts the number of records it has seen for each key using state variables
  with TTLs. It redundantly maintains this count with a value, list, and map state to put load
  on the state variable cleanup mechanism. (In practice, only one value state is needed to maintain
  the count for a given grouping key.)

  The input schema it expects is (String, Long) which represents a (key, source-timestamp) tuple.
  The source-timestamp is passed through so that we can calculate end-to-end latency. The output
  schema is (String, Long, Long), which represents a (key, count, source-timestamp) 3-tuple.
  """

  def init(self, handle: StatefulProcessorHandle) -> None:
    state_schema = StructType([StructField("value", LongType(), True)])
    self.value_state = handle.getValueState("value", state_schema, 30000)
    map_key_schema = StructType([StructField("key", LongType(), True)])
    map_value_schema = StructType([StructField("value", StringType(), True)])
    self.map_state = handle.getMapState("map", map_key_schema, map_value_schema, 30000)
    list_schema = StructType([StructField("value", StringType(), True)])
    self.list_state = handle.getListState("list", list_schema, 30000)

  def handleInputRows(self, key, rows, timerValues) -> Iterator[Row]:
    for row in rows:
      # row is a tuple (key, source_timestamp)
      key_str = row[0]
      source_timestamp = row[1]
      old_value = value.get()
      if old_value is None:
        old_value = 0
      self.value_state.update((old_value + 1,))
      self.map_state.update((old_value,), (key_str,))
      self.list_state.appendValue((key_str,))
      yield Row(key=key_str, value=old_value + 1, timestamp=source_timestamp)

  def close(self) -> None:
    pass


output_schema = StructType(
  [
    StructField("key", StringType(), True),
    StructField("value", LongType(), True),
    StructField("timestamp", TimestampType(), True),
  ]
)

query = (
  spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", broker_address)
  .option("subscribe", input_topic)
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
  .groupBy("key")
  .transformWithState(
    statefulProcessor=RTMStatefulProcessor(),
    outputStructType=output_schema,
    outputMode="Update",
    timeMode="processingTime",
  )
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", broker_address)
  .option("topic", output_topic)
  .option("checkpointLocation", checkpoint_location)
  .trigger(realTime="5 minutes")
  .outputMode("Update")
  .start()
)

Note

Há uma diferença entre como o modo em tempo real e outros modos de execução no Structured Streaming executam o StatefulProcessor in transformWithState. Veja Usar transformWithState no modo de tempo real

Sinks

Escrevendo para o PostgreSQL através de foreachSink

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.sql.{ForeachWriter, Row}

/**
 * Groups connection properties for
 * the JDBC writers.
 *
 * @param url JDBC url of the form jdbc:subprotocol:subname to connect to
 * @param dbtable database table that should be written into
 * @param username username for authentication
 * @param password password for authentication
 */
class JdbcWriterConfig(
    val url: String,
    val dbtable: String,
    val username: String,
    val password: String,
) extends Serializable

/**
 * Handles streaming data writes to a database sink via JDBC, by:
 *   - connecting to the database
 *   - buffering incoming data rows in batches to reduce write overhead
 *
 * @param config connection parameters and configuration knobs for the writer
 */
class JdbcStreamingDataWriter(config: JdbcWriterConfig)
  extends ForeachWriter[Row] with Serializable {
  // The writer currently only supports this hard-coded schema
  private val UPSERT_STATEMENT_SQL =
    s"""MERGE INTO "${config.dbtable}"
       |USING (
       |  SELECT
       |    CAST(? AS INTEGER) AS "id",
       |    CAST(? AS CHARACTER VARYING) AS "data"
       |) AS "source"
       |ON "test"."id" = "source"."id"
       |WHEN MATCHED THEN
       |  UPDATE SET "data" = "source"."data"
       |WHEN NOT MATCHED THEN
       |  INSERT ("id", "data") VALUES ("source"."id", "source"."data")
       |""".stripMargin

  private val MAX_BUFFER_SIZE = 3
  private val buffer = new Array[Row](MAX_BUFFER_SIZE)
  private var bufferSize = 0

  private var connection: Connection = _

  /**
   * Flushes the [[buffer]] by writing all rows in the buffer to the database.
   */
  private def flushBuffer(): Unit = {
    require(connection != null)

    if (bufferSize == 0) {
      return
    }

    var upsertStatement: PreparedStatement = null

    try {
      upsertStatement = connection.prepareStatement(UPSERT_STATEMENT_SQL)

      for (i <- 0 until bufferSize) {
        val row = buffer(i)
        upsertStatement.setInt(1, row.getAs[String]("key"))
        upsertStatement.setString(2, row.getAs[String]("value"))
        upsertStatement.addBatch()
      }

      upsertStatement.executeBatch()
      connection.commit()

      bufferSize = 0
    } catch { case e: Exception =>
      if (connection != null) {
        connection.rollback()
      }
      throw e
    } finally {
      if (upsertStatement != null) {
        upsertStatement.close()
      }
    }
  }

  override def open(partitionId: Long, epochId: Long): Boolean = {
    connection = DriverManager.getConnection(config.url, config.username, config.password)
    true
  }

  override def process(row: Row): Unit = {
    buffer(bufferSize) = row
    bufferSize += 1
    if (bufferSize >= MAX_BUFFER_SIZE) {
      flushBuffer()
    }
  }

  override def close(errorOrNull: Throwable): Unit = {
    flushBuffer()
    if (connection != null) {
      connection.close()
      connection = null
    }
  }
}


spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .writeStream
      .outputMode(OutputMode.Update())
      .trigger(defaultTrigger)
      .foreach(new JdbcStreamingDataWriter(new JdbcWriterConfig(jdbcUrl, tableName, jdbcUsername, jdbcPassword)))
      .start()

Display

Important

Esta funcionalidade está disponível no Databricks Runtime 17.1 e posteriores.

Fonte da taxa de exibição

Neste exemplo, lê-se de uma fonte de taxas e exibe-se o DataFrame de streaming num notebook.

Python
inputDF = (
  spark
  .readStream
  .format("rate")
  .option("numPartitions", 2)
  .option("rowsPerSecond", 1)
  .load()
)
display(inputDF, realTime="5 minutes", outputMode="update")
Scala
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.streaming.OutputMode

val inputDF = spark
  .readStream
  .format("rate")
  .option("numPartitions", 2)
  .option("rowsPerSecond", 1)
  .load()
display(inputDF, trigger=Trigger.RealTime(), outputMode=OutputMode.Update())