Usar o modo em tempo real em Lakeflow Spark Declarative Pipelines

Importante

O modo em tempo real no Lakeflow Spark Declarative Pipelines está em Pré-visualização pública na versão 18.1.2 do Databricks Runtime, no canal de pré-visualização.

O modo em tempo real permite o processamento de dados com latência ultra-baixa, com latência de ponta a ponta tão baixa quanto cinco milissegundos. Use o modo em tempo real para cargas de trabalho operacionais que requerem resposta imediata a dados em fluxo, como deteção de fraude e personalização em tempo real.

O modo em tempo real também está disponível diretamente em Structured Streaming, fora dos pipelines. Consulte Modo em tempo real em Streaming estruturado.

Como o modo em tempo real alcança baixa latência

O modo em tempo real difere do processamento contínuo padrão em três aspetos principais:

  • Batches de longa duração: O sistema processa os dados à medida que ficam disponíveis na fonte dentro de lotes de longa duração (o padrão é cinco minutos).
  • Escalonamento simultâneo de etapas: Todas as fases de consulta são agendadas ao mesmo tempo. O recurso de computação deve ter espaços suficientes para tarefas disponíveis para cobrir todas as fases em simultâneo. Veja Dimensionamento computacional.
  • Streaming shuffle: Os dados são passados entre as fases assim que são produzidos, em vez de esperar que uma etapa a montante seja concluída antes de iniciar a fase a jusante.

O intervalo de checkpoint (configurado através de pipelines.trigger.interval) controla com que frequência o estado e os offsets da origem são persistidos em armazenamento duradouro. Intervalos mais longos reduzem a sobrecarga dos checkpoints, mas aumentam o tempo de recuperação após uma falha e atrasam o relatório das métricas. Intervalos mais curtos melhoram a durabilidade, mas acrescentam sobrecarga.

Modo em tempo real e canais de processamento contínuos

O modo em tempo real é um tipo especializado de gatilho contínuo. O modo contínuo continua a ser necessário — o modo em tempo real adiciona otimizações de latência a nível de fluxo. Para usar o modo em tempo real, o pipeline deve primeiro funcionar em modo contínuo. O modo em tempo real aplica então otimizações adicionais ao nível do fluxo para alcançar uma latência inferior a um segundo para além do que o processamento contínuo padrão proporciona.

Ativar o modo em tempo real requer três passos de configuração:

  1. Define o pipeline para modo contínuo.
  2. Ativar o modo em tempo real ao nível do pipeline.
  3. Defina um fluxo de atualização em tempo real.

Requirements

Requirement Value
Databricks Runtime 18.1.2 no canal de pré-visualização do SDP
Tipo de computação Computação clássica ou computação sem servidor

Configurar o modo em tempo real

Passo 1: Definir o pipeline para modo contínuo

Nas definições do pipeline, defina o modo Pipeline para Contínuo, ou defina-o no pipeline JSON:

{
  "continuous": true
}

Passo 2: Ativar o modo em tempo real ao nível do pipeline

Nas definições do teu pipeline, adiciona a chave seguinte à configuração do Spark em Configuração avançada > do Spark:

spark.databricks.streaming.realTimeMode.enabled = true

Também pode definir isto no pipeline JSON:

{
  "continuous": true,
  "spark_conf": {
    "spark.databricks.streaming.realTimeMode.enabled": "true"
  }
}

Passo 3: Defina um fluxo de atualização em tempo real

O modo em tempo real requer um fluxo de atualizações. Use dp.create_sink() para definir o alvo de saída, depois use o @dp.update_flow decorador com pipelines.trigger definido para "RealTime" e target apontando para o lava-loiça.

from pyspark import pipelines as dp

# Define the output sink
dp.create_sink(
    "my_kafka_sink",
    "kafka",
    {
        "kafka.bootstrap.servers": "<bootstrap-servers>",
        "topic": "<output-topic>",
    }
)

# Define the real-time update flow targeting the sink
@dp.update_flow(
    name="my_rtm_flow",
    target="my_kafka_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",  # optional; defaults to 5 minutes
    }
)
def my_real_time_flow():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", "<bootstrap-servers>")
            .option("subscribe", "<input-topic>")
            .load()
    )

Parâmetros de configuração ao nível de fluxo:

Parâmetro Required Predefinido Description
pipelines.trigger Yes Defina como "RealTime" para ativar o modo de tempo real para este fluxo.
pipelines.trigger.interval No "5 minutes" Intervalo de ponto de controlo. Controla com que frequência o estado e os offsets são guardados. Valores mais curtos melhoram a recuperabilidade; valores mais longos reduzem a sobrecarga.

Exemplos de código

Kafka para Kafka

Leia a partir de um tópico Kafka e escreva para um destino de saída Kafka:

from pyspark import pipelines as dp

dp.create_sink("kafka_output_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="kafka_rtm_flow",
    target="kafka_output_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
    }
)
def kafka_rtm_flow():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .option("startingOffsets", "latest")
            .load()
            .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
    )

Enriquecer com uma junção de transmissão

Combine um fluxo do Kafka com uma tabela de pesquisa estática. Apenas as junções broadcast (stream-to-static) são suportadas. As associações entre fluxos não são suportadas no modo em tempo real.

from pyspark import pipelines as dp
from pyspark.sql.functions import broadcast, expr

dp.create_sink("enriched_output_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": enriched_output_topic,
})

@dp.update_flow(
    name="enriched_events_flow",
    target="enriched_output_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
    }
)
def enriched_events():
    lookup = spark.read.table("catalog.schema.lookup_table")
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
            .withColumn("event_key", expr("CAST(value AS STRING)"))
            .join(broadcast(lookup), expr("event_key = lookup_key"))
            .select("event_key", "lookup_value", "timestamp")
    )

Agregação

Contar eventos por chave usando um estado groupBy. Defina spark.sql.shuffle.partitions para corresponder ao número de partições de entrada para operações com estado:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

dp.create_sink("event_counts_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="event_counts_flow",
    target="event_counts_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
        "spark.sql.shuffle.partitions": "8",
    }
)
def event_counts():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
            .selectExpr("CAST(key AS STRING) AS event_type", "timestamp")
            .groupBy(col("event_type"))
            .count()
    )

Fontes e sumidouros suportados

Connector Como fonte Como sumidouro Notes
Apache Kafka
AWS MSK Utiliza a interface compatível com Kafka.
Hubs de Eventos do Azure (Kafka connector) Utiliza a interface compatível com Kafka.
Amazon Kinesis Não suportado Utilize apenas no modo EFO (Enhanced Fan-Out).
Delta Não suportado Não suportado

Dimensionamento de recursos de computação

Pode executar um pipeline em tempo real por recurso computacional, se o recurso computacional tiver slots de tarefa suficientes. Os espaços disponíveis para tarefas devem cobrir todas as tarefas em todas as fases de consulta.

Tipo de pipeline Configuration Campos de tarefa obrigatórios
Sem estado de estágio único (fonte + sumidouro Kafka) maxPartitions = 8 8
Com estado em dois estágios (fonte Kafka + embaralhamento) maxPartitions = 8, partições de shuffle = 20 28 (8 + 20)
Três etapas (origem Kafka + duas redistribuições) maxPartitions = 8, dois estágios de embaralhamento de 20 cada 48 (8 + 20 + 20)

Se não definires maxPartitions, usa o número de partições no tópico Kafka.

Apoio ao operador

Categoria Operador Suportado
Sem estado Seleção, Projeção
UDFs Scala UDF ✓ (com limitações)
UDFs Python UDF ✓ (com limitações)
Agregação Suma, Contagem, Máximo, Mínimo, Média
Windowing Cambalhota, Deslizamento
Windowing Sessão Não suportado
Deduplication dropDuplicates ✓ (estado ilimitado)
Deduplication dropDuplicatesWithinWatermark Não suportado
Joins Junção da mesa de transmissão
Joins Junção de fluxo a fluxo Não suportado
Personalizado transformWithState ✓ (com diferenças comportamentais)
Personalizado union ✓ (com limitações)
Personalizado forEach Não suportado
Personalizado flatMapGroupsWithState Não suportado
Personalizado mapPartitions Não suportado
Personalizado forEachBatch Não suportado

transformWithState em modo de tempo real

transformWithState é suportado em modo em tempo real, com as seguintes diferenças em relação ao processamento micro-batch:

  • handleInputRows é invocado uma vez por linha em vez de uma vez por tecla por lote. O inputRows iterador produz um único valor por invocação.
  • Não são suportados temporizadores de tempo de evento. Os temporizadores de tempo de processamento disparam quando um lote de longa duração termina se não tiver chegado nenhum dado.
  • transformWithStateInPandas não é suportado.

Pandas UDFs em modo de tempo real

Para minimizar a latência com os UDFs do pandas, defina spark.sql.execution.arrow.maxRecordsPerBatch para 1. Isto otimiza a latência à custa do rendimento. Se a taxa de transferência também for importante, defina esse valor igual 100 ou superior.

Monitorizar o desempenho em modo em tempo real

O modo em tempo real expõe métricas de latência em StreamingQueryProgress debaixo do latencies campo. Aceda a estas métricas através de StreamingQueryListener ou inspecionando a propriedade lastProgress na consulta de streaming.

Métrica Description
processingLatencyMs Tempo entre o momento em que um registo é lido pelo fluxo e o momento em que é totalmente processado pelo fluxo
sourceQueuingLatencyMs Tempo entre o momento em que um registo é gravado com êxito no barramento de mensagens (por exemplo, o instante de acrescento ao registo no Kafka) e o momento em que é lido pela primeira vez pelo fluxo
e2eLatencyMs Latência total de ponta a ponta desde o momento em que o registo é produzido na origem até ao momento em que é totalmente processado pelo fluxo

Cada métrica é apresentada nos percentis p50, p90, p95 e p99.

Limitações

Recomenda-se um fluxo em tempo real por pipeline. São permitidos vários fluxos, mas a contenção de slots de tarefas entre vários fluxos aumenta a latência.

Para uma lista completa das limitações do operador e da fonte, veja Limitações do modo em tempo real.

Recursos adicionais