Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Importante
O modo em tempo real no Lakeflow Spark Declarative Pipelines está em Prévia Pública no Databricks Runtime 18.1.2 no canal de prévia.
O modo em tempo real permite o processamento de dados de latência ultra baixa, com latência de ponta a ponta tão baixa quanto cinco milissegundos. Use o modo em tempo real para cargas de trabalho operacionais que exigem resposta imediata aos dados de streaming, como detecção de fraude e personalização em tempo real.
O modo em tempo real também está disponível diretamente no Streaming Estruturado fora dos pipelines. Consulte o modo em tempo real no Streaming Estruturado.
Como o modo em tempo real obtém baixa latência
O modo em tempo real difere do processamento contínuo padrão de três maneiras principais:
- Lotes de execução longa: o sistema processa dados à medida que ficam disponíveis na origem em lotes de execução longa (o padrão é cinco minutos).
- Agendamento simultâneo de etapas: todas as etapas da consulta são agendadas ao mesmo tempo. O recurso de computação deve ter slots de tarefa disponíveis suficientes para abranger todos os estágios simultaneamente. Consulte o dimensionamento da computação.
- Streaming shuffle: os dados são transferidos entre os estágios assim que são produzidos, em vez de esperar que um estágio anterior seja concluído antes de iniciar o estágio seguinte.
O intervalo de checkpoint (configurado por meio de pipelines.trigger.interval) controla com que frequência o estado e os offsets da origem são gravados em armazenamento durável. Intervalos mais longos reduzem a sobrecarga de ponto de verificação, mas aumentam o tempo de recuperação após uma falha e atrasam os relatórios de métricas. Intervalos mais curtos melhoram a durabilidade, mas adicionam sobrecarga.
Modo em tempo real e fluxos contínuos
O modo em tempo real é um tipo especializado de gatilho contínuo. O modo contínuo ainda é necessário – o modo em tempo real adiciona otimizações de latência no nível de fluxo na parte superior. Para usar o modo em tempo real, o pipeline deve primeiro ser executado no modo contínuo. Em seguida, o modo em tempo real aplica otimizações adicionais no nível de fluxo para obter latência de sub-segundo além do que o processamento contínuo padrão fornece.
Habilitar o modo em tempo real requer três etapas de configuração:
- Defina o pipeline como modo contínuo.
- Habilite o modo em tempo real no nível do pipeline.
- Defina um fluxo de atualização em tempo real.
Requirements
| Requirement | Value |
|---|---|
| Databricks Runtime | 18.1.2 no canal de visualização do SDP |
| Tipo de computação | Computação clássica ou sem servidor |
Configurar o modo em tempo real
Etapa 1: Definir o pipeline como modo contínuo
Nas configurações do pipeline, defina o modo Pipeline como Contínuo ou defina-o no JSON do pipeline:
{
"continuous": true
}
Etapa 2: Habilitar o modo em tempo real no nível do pipeline
Nas configurações do pipeline, adicione a seguinte chave à configuração do Spark em Configuração avançada > do Spark:
spark.databricks.streaming.realTimeMode.enabled = true
Você também pode definir isso no JSON do pipeline:
{
"continuous": true,
"spark_conf": {
"spark.databricks.streaming.realTimeMode.enabled": "true"
}
}
Etapa 3: Definir um fluxo de atualização em tempo real
O modo em tempo real requer um fluxo de atualização. Use dp.create_sink() para definir o destino de saída e, em seguida, use o decorador @dp.update_flow com pipelines.trigger definido como "RealTime" e target apontando para o sink.
from pyspark import pipelines as dp
# Define the output sink
dp.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "<bootstrap-servers>",
"topic": "<output-topic>",
}
)
# Define the real-time update flow targeting the sink
@dp.update_flow(
name="my_rtm_flow",
target="my_kafka_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes", # optional; defaults to 5 minutes
}
)
def my_real_time_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-servers>")
.option("subscribe", "<input-topic>")
.load()
)
Parâmetros de configuração de nível de fluxo:
| Parâmetro | Obrigatório | Default | Description |
|---|---|---|---|
pipelines.trigger |
Yes | — | Defina como "RealTime" para ativar o modo em tempo real para esse fluxo. |
pipelines.trigger.interval |
No | "5 minutes" |
Intervalo de ponto de verificação. Controla com que frequência o estado e os offsets são gravados. Valores mais curtos melhoram a capacidade de recuperação; valores mais longos reduzem a sobrecarga. |
Exemplos de código
Kafka para Kafka
Leia um tópico do Kafka e escreva em um destino de saída do Kafka:
from pyspark import pipelines as dp
dp.create_sink("kafka_output_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="kafka_rtm_flow",
target="kafka_output_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def kafka_rtm_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
)
Enriquecer com uma junção de difusão
Faça a junção de um stream do Kafka com uma tabela de consulta estática. Há suporte apenas para junções de transmissão (stream-to-static). Não há suporte para junções stream-to-stream no modo em tempo real.
from pyspark import pipelines as dp
from pyspark.sql.functions import broadcast, expr
dp.create_sink("enriched_output_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": enriched_output_topic,
})
@dp.update_flow(
name="enriched_events_flow",
target="enriched_output_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def enriched_events():
lookup = spark.read.table("catalog.schema.lookup_table")
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.withColumn("event_key", expr("CAST(value AS STRING)"))
.join(broadcast(lookup), expr("event_key = lookup_key"))
.select("event_key", "lookup_value", "timestamp")
)
Agregação
Contar eventos por chave usando um estado groupBy. Defina spark.sql.shuffle.partitions para corresponder ao número de partições de entrada para operações com estado:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
dp.create_sink("event_counts_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="event_counts_flow",
target="event_counts_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
"spark.sql.shuffle.partitions": "8",
}
)
def event_counts():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING) AS event_type", "timestamp")
.groupBy(col("event_type"))
.count()
)
Fontes e coletores com suporte
| Conector | Como fonte | Como coletor | Anotações |
|---|---|---|---|
| Apache Kafka | ✓ | ✓ | — |
| AWS MSK | ✓ | ✓ | Usa a interface compatível com Kafka. |
| Hubs de Eventos do Azure (conector Kafka) | ✓ | ✓ | Usa a interface compatível com Kafka. |
| Amazon Kinesis | ✓ | Sem suporte | Use apenas no modo EFO (Fan-Out Aprimorado). |
| Delta | Sem suporte | Sem suporte | — |
Dimensionamento de computação
Você pode executar um pipeline em tempo real para cada recurso de computação, se o recurso de computação tiver slots para tarefas suficientes. Os slots de tarefa disponíveis devem abranger todas as tarefas em todos os estágios de consulta.
| Tipo de pipeline | Configuration | Slots de tarefa necessários |
|---|---|---|
| Sem estado de estágio único (origem kafka + coletor) |
maxPartitions = 8 |
8 |
| Estado com dois estágios (fonte Kafka + embaralhamento) |
maxPartitions = 8, partições de embaralhamento = 20 |
28 (8 + 20) |
| Três estágios (origem Kafka + duas reorganizações) |
maxPartitions = 8, duas fases de shuffle de 20 cada um |
48 (8 + 20 + 20) |
Se você não definir maxPartitions, use o número de partições no tópico Kafka.
Suporte ao operador
| Categoria | Operador | Supported |
|---|---|---|
| Sem estado | Seleção, Projeção | ✓ |
| UDFs | Scala UDF | ✓ (com limitações) |
| UDFs | Python UDF | ✓ (com limitações) |
| Agregação | soma, contagem, máximo, mínimo, média | ✓ |
| Windowing | Caindo, deslizando | ✓ |
| Windowing | Session | Sem suporte |
| Deduplicação | dropDuplicates |
✓ (estado ilimitado) |
| Deduplicação | dropDuplicatesWithinWatermark |
Sem suporte |
| Joins | Junção de tabela de difusão | ✓ |
| Joins | Associação entre fluxos | Sem suporte |
| Personalizado | transformWithState |
✓ (com diferenças comportamentais) |
| Personalizado | union |
✓ (com limitações) |
| Personalizado | forEach |
Sem suporte |
| Personalizado | flatMapGroupsWithState |
Sem suporte |
| Personalizado | mapPartitions |
Sem suporte |
| Personalizado | forEachBatch |
Sem suporte |
transformWithState no modo em tempo real
transformWithState é compatível com o modo em tempo real, com as seguintes diferenças em relação ao processamento em micro-lotes:
-
handleInputRowsé invocado uma vez por linha em vez de uma vez por chave por lote. OinputRowsiterador produz um único valor por invocação. - Não há suporte para temporizadores de evento. Os temporizadores de tempo de processamento são acionados quando um lote de longa duração termina, caso nenhum dado tenha chegado.
- Não há suporte para
transformWithStateInPandas.
UDFs do Pandas no modo em tempo real
Para minimizar a latência com UDFs do pandas, defina spark.sql.execution.arrow.maxRecordsPerBatch como 1. Isso reduz a latência em detrimento da taxa de transferência. Se a taxa de transferência também for importante, defina esse valor como 100 ou superior.
Monitorar o desempenho do modo em tempo real
O modo em tempo real exibe as métricas de latência em StreamingQueryProgress, no campo latencies. Acesse essas métricas via StreamingQueryListener ou inspecionando a propriedade lastProgress na consulta de streaming.
| Métrica | Description |
|---|---|
processingLatencyMs |
Tempo entre quando um registro é lido pelo fluxo e quando ele é totalmente processado pelo fluxo |
sourceQueuingLatencyMs |
Tempo entre o momento em que um registro é gravado com sucesso no barramento de mensagens (por exemplo, horário de acréscimo ao log no Kafka) e o momento em que é lido pela primeira vez pelo fluxo |
e2eLatencyMs |
Latência total de ponta a ponta de quando o registro é produzido na origem até quando é totalmente processado pelo fluxo |
Cada métrica é relatada como percentil p50, p90, p95 e p99.
Limitações
É recomendável um fluxo em tempo real por pipeline. Vários fluxos são permitidos, mas a contenção de slot de tarefa entre fluxos aumenta a latência.
Para obter uma lista completa de limitações de operador e de origem, consulte as limitações do modo em tempo real.