Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Importante
O modo em tempo real no Lakeflow Spark Declarative Pipelines está em Pré-visualização pública na versão 18.1.2 do Databricks Runtime, no canal de pré-visualização.
O modo em tempo real permite o processamento de dados com latência ultra-baixa, com latência de ponta a ponta tão baixa quanto cinco milissegundos. Use o modo em tempo real para cargas de trabalho operacionais que requerem resposta imediata a dados em fluxo, como deteção de fraude e personalização em tempo real.
O modo em tempo real também está disponível diretamente em Structured Streaming, fora dos pipelines. Consulte Modo em tempo real em Streaming estruturado.
Como o modo em tempo real alcança baixa latência
O modo em tempo real difere do processamento contínuo padrão em três aspetos principais:
- Batches de longa duração: O sistema processa os dados à medida que ficam disponíveis na fonte dentro de lotes de longa duração (o padrão é cinco minutos).
- Escalonamento simultâneo de etapas: Todas as fases de consulta são agendadas ao mesmo tempo. O recurso de computação deve ter espaços suficientes para tarefas disponíveis para cobrir todas as fases em simultâneo. Veja Dimensionamento computacional.
- Streaming shuffle: Os dados são passados entre as fases assim que são produzidos, em vez de esperar que uma etapa a montante seja concluída antes de iniciar a fase a jusante.
O intervalo de checkpoint (configurado através de pipelines.trigger.interval) controla com que frequência o estado e os offsets da origem são persistidos em armazenamento duradouro. Intervalos mais longos reduzem a sobrecarga dos checkpoints, mas aumentam o tempo de recuperação após uma falha e atrasam o relatório das métricas. Intervalos mais curtos melhoram a durabilidade, mas acrescentam sobrecarga.
Modo em tempo real e canais de processamento contínuos
O modo em tempo real é um tipo especializado de gatilho contínuo. O modo contínuo continua a ser necessário — o modo em tempo real adiciona otimizações de latência a nível de fluxo. Para usar o modo em tempo real, o pipeline deve primeiro funcionar em modo contínuo. O modo em tempo real aplica então otimizações adicionais ao nível do fluxo para alcançar uma latência inferior a um segundo para além do que o processamento contínuo padrão proporciona.
Ativar o modo em tempo real requer três passos de configuração:
- Define o pipeline para modo contínuo.
- Ativar o modo em tempo real ao nível do pipeline.
- Defina um fluxo de atualização em tempo real.
Requirements
| Requirement | Value |
|---|---|
| Databricks Runtime | 18.1.2 no canal de pré-visualização do SDP |
| Tipo de computação | Computação clássica ou computação sem servidor |
Configurar o modo em tempo real
Passo 1: Definir o pipeline para modo contínuo
Nas definições do pipeline, defina o modo Pipeline para Contínuo, ou defina-o no pipeline JSON:
{
"continuous": true
}
Passo 2: Ativar o modo em tempo real ao nível do pipeline
Nas definições do teu pipeline, adiciona a chave seguinte à configuração do Spark em Configuração avançada > do Spark:
spark.databricks.streaming.realTimeMode.enabled = true
Também pode definir isto no pipeline JSON:
{
"continuous": true,
"spark_conf": {
"spark.databricks.streaming.realTimeMode.enabled": "true"
}
}
Passo 3: Defina um fluxo de atualização em tempo real
O modo em tempo real requer um fluxo de atualizações. Use dp.create_sink() para definir o alvo de saída, depois use o @dp.update_flow decorador com pipelines.trigger definido para "RealTime" e target apontando para o lava-loiça.
from pyspark import pipelines as dp
# Define the output sink
dp.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "<bootstrap-servers>",
"topic": "<output-topic>",
}
)
# Define the real-time update flow targeting the sink
@dp.update_flow(
name="my_rtm_flow",
target="my_kafka_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes", # optional; defaults to 5 minutes
}
)
def my_real_time_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-servers>")
.option("subscribe", "<input-topic>")
.load()
)
Parâmetros de configuração ao nível de fluxo:
| Parâmetro | Required | Predefinido | Description |
|---|---|---|---|
pipelines.trigger |
Yes | — | Defina como "RealTime" para ativar o modo de tempo real para este fluxo. |
pipelines.trigger.interval |
No | "5 minutes" |
Intervalo de ponto de controlo. Controla com que frequência o estado e os offsets são guardados. Valores mais curtos melhoram a recuperabilidade; valores mais longos reduzem a sobrecarga. |
Exemplos de código
Kafka para Kafka
Leia a partir de um tópico Kafka e escreva para um destino de saída Kafka:
from pyspark import pipelines as dp
dp.create_sink("kafka_output_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="kafka_rtm_flow",
target="kafka_output_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def kafka_rtm_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
)
Enriquecer com uma junção de transmissão
Combine um fluxo do Kafka com uma tabela de pesquisa estática. Apenas as junções broadcast (stream-to-static) são suportadas. As associações entre fluxos não são suportadas no modo em tempo real.
from pyspark import pipelines as dp
from pyspark.sql.functions import broadcast, expr
dp.create_sink("enriched_output_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": enriched_output_topic,
})
@dp.update_flow(
name="enriched_events_flow",
target="enriched_output_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def enriched_events():
lookup = spark.read.table("catalog.schema.lookup_table")
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.withColumn("event_key", expr("CAST(value AS STRING)"))
.join(broadcast(lookup), expr("event_key = lookup_key"))
.select("event_key", "lookup_value", "timestamp")
)
Agregação
Contar eventos por chave usando um estado groupBy. Defina spark.sql.shuffle.partitions para corresponder ao número de partições de entrada para operações com estado:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
dp.create_sink("event_counts_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="event_counts_flow",
target="event_counts_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
"spark.sql.shuffle.partitions": "8",
}
)
def event_counts():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING) AS event_type", "timestamp")
.groupBy(col("event_type"))
.count()
)
Fontes e sumidouros suportados
| Connector | Como fonte | Como sumidouro | Notes |
|---|---|---|---|
| Apache Kafka | ✓ | ✓ | — |
| AWS MSK | ✓ | ✓ | Utiliza a interface compatível com Kafka. |
| Hubs de Eventos do Azure (Kafka connector) | ✓ | ✓ | Utiliza a interface compatível com Kafka. |
| Amazon Kinesis | ✓ | Não suportado | Utilize apenas no modo EFO (Enhanced Fan-Out). |
| Delta | Não suportado | Não suportado | — |
Dimensionamento de recursos de computação
Pode executar um pipeline em tempo real por recurso computacional, se o recurso computacional tiver slots de tarefa suficientes. Os espaços disponíveis para tarefas devem cobrir todas as tarefas em todas as fases de consulta.
| Tipo de pipeline | Configuration | Campos de tarefa obrigatórios |
|---|---|---|
| Sem estado de estágio único (fonte + sumidouro Kafka) |
maxPartitions = 8 |
8 |
| Com estado em dois estágios (fonte Kafka + embaralhamento) |
maxPartitions = 8, partições de shuffle = 20 |
28 (8 + 20) |
| Três etapas (origem Kafka + duas redistribuições) |
maxPartitions = 8, dois estágios de embaralhamento de 20 cada |
48 (8 + 20 + 20) |
Se não definires maxPartitions, usa o número de partições no tópico Kafka.
Apoio ao operador
| Categoria | Operador | Suportado |
|---|---|---|
| Sem estado | Seleção, Projeção | ✓ |
| UDFs | Scala UDF | ✓ (com limitações) |
| UDFs | Python UDF | ✓ (com limitações) |
| Agregação | Suma, Contagem, Máximo, Mínimo, Média | ✓ |
| Windowing | Cambalhota, Deslizamento | ✓ |
| Windowing | Sessão | Não suportado |
| Deduplication | dropDuplicates |
✓ (estado ilimitado) |
| Deduplication | dropDuplicatesWithinWatermark |
Não suportado |
| Joins | Junção da mesa de transmissão | ✓ |
| Joins | Junção de fluxo a fluxo | Não suportado |
| Personalizado | transformWithState |
✓ (com diferenças comportamentais) |
| Personalizado | union |
✓ (com limitações) |
| Personalizado | forEach |
Não suportado |
| Personalizado | flatMapGroupsWithState |
Não suportado |
| Personalizado | mapPartitions |
Não suportado |
| Personalizado | forEachBatch |
Não suportado |
transformWithState em modo de tempo real
transformWithState é suportado em modo em tempo real, com as seguintes diferenças em relação ao processamento micro-batch:
-
handleInputRowsé invocado uma vez por linha em vez de uma vez por tecla por lote. OinputRowsiterador produz um único valor por invocação. - Não são suportados temporizadores de tempo de evento. Os temporizadores de tempo de processamento disparam quando um lote de longa duração termina se não tiver chegado nenhum dado.
-
transformWithStateInPandasnão é suportado.
Pandas UDFs em modo de tempo real
Para minimizar a latência com os UDFs do pandas, defina spark.sql.execution.arrow.maxRecordsPerBatch para 1. Isto otimiza a latência à custa do rendimento. Se a taxa de transferência também for importante, defina esse valor igual 100 ou superior.
Monitorizar o desempenho em modo em tempo real
O modo em tempo real expõe métricas de latência em StreamingQueryProgress debaixo do latencies campo. Aceda a estas métricas através de StreamingQueryListener ou inspecionando a propriedade lastProgress na consulta de streaming.
| Métrica | Description |
|---|---|
processingLatencyMs |
Tempo entre o momento em que um registo é lido pelo fluxo e o momento em que é totalmente processado pelo fluxo |
sourceQueuingLatencyMs |
Tempo entre o momento em que um registo é gravado com êxito no barramento de mensagens (por exemplo, o instante de acrescento ao registo no Kafka) e o momento em que é lido pela primeira vez pelo fluxo |
e2eLatencyMs |
Latência total de ponta a ponta desde o momento em que o registo é produzido na origem até ao momento em que é totalmente processado pelo fluxo |
Cada métrica é apresentada nos percentis p50, p90, p95 e p99.
Limitações
Recomenda-se um fluxo em tempo real por pipeline. São permitidos vários fluxos, mas a contenção de slots de tarefas entre vários fluxos aumenta a latência.
Para uma lista completa das limitações do operador e da fonte, veja Limitações do modo em tempo real.