Usar o modo em tempo real no Lakeflow Spark Declarative Pipelines

Importante

O modo em tempo real no Lakeflow Spark Declarative Pipelines está em Prévia Pública no Databricks Runtime 18.1.2 no canal de prévia.

O modo em tempo real permite o processamento de dados de latência ultra baixa, com latência de ponta a ponta tão baixa quanto cinco milissegundos. Use o modo em tempo real para cargas de trabalho operacionais que exigem resposta imediata aos dados de streaming, como detecção de fraude e personalização em tempo real.

O modo em tempo real também está disponível diretamente no Streaming Estruturado fora dos pipelines. Consulte o modo em tempo real no Streaming Estruturado.

Como o modo em tempo real obtém baixa latência

O modo em tempo real difere do processamento contínuo padrão de três maneiras principais:

  • Lotes de execução longa: o sistema processa dados à medida que ficam disponíveis na origem em lotes de execução longa (o padrão é cinco minutos).
  • Agendamento simultâneo de etapas: todas as etapas da consulta são agendadas ao mesmo tempo. O recurso de computação deve ter slots de tarefa disponíveis suficientes para abranger todos os estágios simultaneamente. Consulte o dimensionamento da computação.
  • Streaming shuffle: os dados são transferidos entre os estágios assim que são produzidos, em vez de esperar que um estágio anterior seja concluído antes de iniciar o estágio seguinte.

O intervalo de checkpoint (configurado por meio de pipelines.trigger.interval) controla com que frequência o estado e os offsets da origem são gravados em armazenamento durável. Intervalos mais longos reduzem a sobrecarga de ponto de verificação, mas aumentam o tempo de recuperação após uma falha e atrasam os relatórios de métricas. Intervalos mais curtos melhoram a durabilidade, mas adicionam sobrecarga.

Modo em tempo real e fluxos contínuos

O modo em tempo real é um tipo especializado de gatilho contínuo. O modo contínuo ainda é necessário – o modo em tempo real adiciona otimizações de latência no nível de fluxo na parte superior. Para usar o modo em tempo real, o pipeline deve primeiro ser executado no modo contínuo. Em seguida, o modo em tempo real aplica otimizações adicionais no nível de fluxo para obter latência de sub-segundo além do que o processamento contínuo padrão fornece.

Habilitar o modo em tempo real requer três etapas de configuração:

  1. Defina o pipeline como modo contínuo.
  2. Habilite o modo em tempo real no nível do pipeline.
  3. Defina um fluxo de atualização em tempo real.

Requirements

Requirement Value
Databricks Runtime 18.1.2 no canal de visualização do SDP
Tipo de computação Computação clássica ou sem servidor

Configurar o modo em tempo real

Etapa 1: Definir o pipeline como modo contínuo

Nas configurações do pipeline, defina o modo Pipeline como Contínuo ou defina-o no JSON do pipeline:

{
  "continuous": true
}

Etapa 2: Habilitar o modo em tempo real no nível do pipeline

Nas configurações do pipeline, adicione a seguinte chave à configuração do Spark em Configuração avançada > do Spark:

spark.databricks.streaming.realTimeMode.enabled = true

Você também pode definir isso no JSON do pipeline:

{
  "continuous": true,
  "spark_conf": {
    "spark.databricks.streaming.realTimeMode.enabled": "true"
  }
}

Etapa 3: Definir um fluxo de atualização em tempo real

O modo em tempo real requer um fluxo de atualização. Use dp.create_sink() para definir o destino de saída e, em seguida, use o decorador @dp.update_flow com pipelines.trigger definido como "RealTime" e target apontando para o sink.

from pyspark import pipelines as dp

# Define the output sink
dp.create_sink(
    "my_kafka_sink",
    "kafka",
    {
        "kafka.bootstrap.servers": "<bootstrap-servers>",
        "topic": "<output-topic>",
    }
)

# Define the real-time update flow targeting the sink
@dp.update_flow(
    name="my_rtm_flow",
    target="my_kafka_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",  # optional; defaults to 5 minutes
    }
)
def my_real_time_flow():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", "<bootstrap-servers>")
            .option("subscribe", "<input-topic>")
            .load()
    )

Parâmetros de configuração de nível de fluxo:

Parâmetro Obrigatório Default Description
pipelines.trigger Yes Defina como "RealTime" para ativar o modo em tempo real para esse fluxo.
pipelines.trigger.interval No "5 minutes" Intervalo de ponto de verificação. Controla com que frequência o estado e os offsets são gravados. Valores mais curtos melhoram a capacidade de recuperação; valores mais longos reduzem a sobrecarga.

Exemplos de código

Kafka para Kafka

Leia um tópico do Kafka e escreva em um destino de saída do Kafka:

from pyspark import pipelines as dp

dp.create_sink("kafka_output_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="kafka_rtm_flow",
    target="kafka_output_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
    }
)
def kafka_rtm_flow():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .option("startingOffsets", "latest")
            .load()
            .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
    )

Enriquecer com uma junção de difusão

Faça a junção de um stream do Kafka com uma tabela de consulta estática. Há suporte apenas para junções de transmissão (stream-to-static). Não há suporte para junções stream-to-stream no modo em tempo real.

from pyspark import pipelines as dp
from pyspark.sql.functions import broadcast, expr

dp.create_sink("enriched_output_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": enriched_output_topic,
})

@dp.update_flow(
    name="enriched_events_flow",
    target="enriched_output_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
    }
)
def enriched_events():
    lookup = spark.read.table("catalog.schema.lookup_table")
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
            .withColumn("event_key", expr("CAST(value AS STRING)"))
            .join(broadcast(lookup), expr("event_key = lookup_key"))
            .select("event_key", "lookup_value", "timestamp")
    )

Agregação

Contar eventos por chave usando um estado groupBy. Defina spark.sql.shuffle.partitions para corresponder ao número de partições de entrada para operações com estado:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

dp.create_sink("event_counts_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="event_counts_flow",
    target="event_counts_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
        "spark.sql.shuffle.partitions": "8",
    }
)
def event_counts():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
            .selectExpr("CAST(key AS STRING) AS event_type", "timestamp")
            .groupBy(col("event_type"))
            .count()
    )

Fontes e coletores com suporte

Conector Como fonte Como coletor Anotações
Apache Kafka
AWS MSK Usa a interface compatível com Kafka.
Hubs de Eventos do Azure (conector Kafka) Usa a interface compatível com Kafka.
Amazon Kinesis Sem suporte Use apenas no modo EFO (Fan-Out Aprimorado).
Delta Sem suporte Sem suporte

Dimensionamento de computação

Você pode executar um pipeline em tempo real para cada recurso de computação, se o recurso de computação tiver slots para tarefas suficientes. Os slots de tarefa disponíveis devem abranger todas as tarefas em todos os estágios de consulta.

Tipo de pipeline Configuration Slots de tarefa necessários
Sem estado de estágio único (origem kafka + coletor) maxPartitions = 8 8
Estado com dois estágios (fonte Kafka + embaralhamento) maxPartitions = 8, partições de embaralhamento = 20 28 (8 + 20)
Três estágios (origem Kafka + duas reorganizações) maxPartitions = 8, duas fases de shuffle de 20 cada um 48 (8 + 20 + 20)

Se você não definir maxPartitions, use o número de partições no tópico Kafka.

Suporte ao operador

Categoria Operador Supported
Sem estado Seleção, Projeção
UDFs Scala UDF ✓ (com limitações)
UDFs Python UDF ✓ (com limitações)
Agregação soma, contagem, máximo, mínimo, média
Windowing Caindo, deslizando
Windowing Session Sem suporte
Deduplicação dropDuplicates ✓ (estado ilimitado)
Deduplicação dropDuplicatesWithinWatermark Sem suporte
Joins Junção de tabela de difusão
Joins Associação entre fluxos Sem suporte
Personalizado transformWithState ✓ (com diferenças comportamentais)
Personalizado union ✓ (com limitações)
Personalizado forEach Sem suporte
Personalizado flatMapGroupsWithState Sem suporte
Personalizado mapPartitions Sem suporte
Personalizado forEachBatch Sem suporte

transformWithState no modo em tempo real

transformWithState é compatível com o modo em tempo real, com as seguintes diferenças em relação ao processamento em micro-lotes:

  • handleInputRows é invocado uma vez por linha em vez de uma vez por chave por lote. O inputRows iterador produz um único valor por invocação.
  • Não há suporte para temporizadores de evento. Os temporizadores de tempo de processamento são acionados quando um lote de longa duração termina, caso nenhum dado tenha chegado.
  • Não há suporte para transformWithStateInPandas.

UDFs do Pandas no modo em tempo real

Para minimizar a latência com UDFs do pandas, defina spark.sql.execution.arrow.maxRecordsPerBatch como 1. Isso reduz a latência em detrimento da taxa de transferência. Se a taxa de transferência também for importante, defina esse valor como 100 ou superior.

Monitorar o desempenho do modo em tempo real

O modo em tempo real exibe as métricas de latência em StreamingQueryProgress, no campo latencies. Acesse essas métricas via StreamingQueryListener ou inspecionando a propriedade lastProgress na consulta de streaming.

Métrica Description
processingLatencyMs Tempo entre quando um registro é lido pelo fluxo e quando ele é totalmente processado pelo fluxo
sourceQueuingLatencyMs Tempo entre o momento em que um registro é gravado com sucesso no barramento de mensagens (por exemplo, horário de acréscimo ao log no Kafka) e o momento em que é lido pela primeira vez pelo fluxo
e2eLatencyMs Latência total de ponta a ponta de quando o registro é produzido na origem até quando é totalmente processado pelo fluxo

Cada métrica é relatada como percentil p50, p90, p95 e p99.

Limitações

É recomendável um fluxo em tempo real por pipeline. Vários fluxos são permitidos, mas a contenção de slot de tarefa entre fluxos aumenta a latência.

Para obter uma lista completa de limitações de operador e de origem, consulte as limitações do modo em tempo real.

Recursos adicionais