Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Important
Esse recurso está em Visualização Pública.
Esta página descreve o modo em tempo real, um tipo de gatilho para Streaming Estruturado que permite o processamento de dados de latência ultra-baixa com latência de ponta a ponta tão baixa quanto 5 ms. Esse modo foi projetado para cargas de trabalho operacionais que exigem resposta imediata aos dados de streaming.
O modo em tempo real está disponível no Databricks Runtime 16.4 LTS e posterior.
Cargas de trabalho operacionais
As cargas de trabalho de streaming podem ser amplamente divididas em cargas de trabalho analíticas e cargas de trabalho operacionais:
- Os workloads analíticos usam a ingestão e a transformação de dados, geralmente seguindo a arquitetura do medalhão (por exemplo, ingestão de dados nas tabelas bronze, prata e ouro).
- As cargas de trabalho operacionais consomem dados em tempo real, aplicam lógica de negócios e disparam ações ou decisões downstream.
Alguns exemplos de cargas de trabalho operacionais são:
- Bloquear ou sinalizar uma transação de cartão de crédito em tempo real se uma pontuação de fraude exceder um limite, com base em fatores como localização incomum, tamanho de transação grande ou padrões de gastos rápidos.
- Entregar uma mensagem promocional quando os dados de navegação mostram que um usuário está procurando por jeans há cinco minutos, oferecendo um desconto de 25% se comprar nos próximos 15 minutos.
Em geral, os workloads operacionais são caracterizados pela necessidade de latência de ponta a ponta de menos de um segundo. Isso pode ser feito com o modo em tempo real no Streaming Estruturado do Apache Spark.
Como o modo em tempo real obtém baixa latência
O modo em tempo real melhora a arquitetura de execução:
- Executando lotes de execução longa (o padrão é 5 minutos), em que os dados são processados conforme ficam disponíveis na origem.
- Todos os estágios da consulta são agendados simultaneamente. Isso requer que o número de slots de tarefas disponíveis seja igual ou maior que o número de tarefas de todos os estágios em um lote.
- Os dados são transmitidos entre os estágios assim que são produzidos, usando um streaming de embaralhamento.
Ao final do processamento de um lote e antes do início do próximo lote, os pontos de verificação de Streaming Estruturado progridem e disponibilizam métricas para o último lote. Se os lotes forem maiores, essas atividades poderão ser menos frequentes, levando a repetições mais longas em caso de falha e atraso na disponibilidade de métricas. Por outro lado, se os lotes forem menores, essas atividades se tornarão mais frequentes, potencialmente afetando a latência. O Databricks recomenda que você faça benchmark do modo em tempo real em relação à carga de trabalho de destino e aos requisitos para encontrar o intervalo de gatilho apropriado.
Configuração do cluster
Para usar o modo em tempo real no Streaming Estruturado, você deve configurar um trabalho clássico do Lakeflow:
No workspace do Azure Databricks, clique em Novo no canto superior esquerdo. Escolha Mais e clique em Cluster.
Limpar Aceleração do Photon.
Limpar Habilitar o dimensionamento automático.
Em Performance avançada, limpe Usar instâncias spot.
No modo Avançado e de Acesso, clique em Manual e selecione Dedicado (anteriormente: Usuário único).
No Spark, insira o seguinte em configuração do Spark:
spark.databricks.streaming.realTimeMode.enabled trueClique em Criar.
Requisitos de tamanho do cluster
Você poderá executar um trabalho em tempo real por cluster se o cluster tiver slots de tarefa suficientes.
Para ser executado no modo de baixa latência, o número total de slots de tarefa disponíveis deve ser maior ou igual ao número de tarefas em todos os estágios de consulta.
Exemplos de cálculo de slots
Pipeline sem estado de estágio único (fonte Kafka + coletor):
Se maxPartitions = 8, você precisará de pelo menos 8 slots. Se maxPartitions não está definido, use o número de partições do tópico Kafka.
Pipeline com estado em dois estágios (fonte Kafka + embaralhamento):
Se maxPartitions = 8 e partições de shuffle = 20, você precisará de 8 + 20 = 28 slots.
Pipeline de três estágios (fonte Kafka + embaralhamento + repartição):
Com maxPartitions = 8 e dois estágios de embaralhamento de 20 cada, são necessários 8 + 20 + 20 = 48 slots.
Principais considerações
Ao configurar o cluster, leve isso em consideração:
- Ao contrário do modo de microlote, as tarefas em tempo real podem permanecer ociosas enquanto aguardam dados, portanto, o dimensionamento correto é essencial para evitar o desperdício de recursos.
- Objetive um nível de utilização alvo (por exemplo, 50%) ajustando:
-
maxPartitions(para Kafka) -
spark.sql.shuffle.partitions(para estágios de embaralhamento)
-
- O Databricks recomenda definir maxPartitions para que cada tarefa trate várias partições kafka para reduzir a sobrecarga.
- Ajuste os slots de tarefa por trabalhador para corresponder à carga de trabalho para tarefas simples de estágio único.
- No caso de trabalhos com muito embaralhamento, faça experiências para encontrar o número mínimo de partições de embaralhamento que evitem listas de pendências e ajuste a partir daí. O trabalho não será agendado se o cluster não tiver slots suficientes.
Note
A partir do Databricks Runtime 16.4 LTS e posterior, todos os pipelines em tempo real usam checkpoint v2, permitindo alternar de forma contínua entre os modos em tempo real e microlote.
Configuração de consulta
Você deve habilitar o gatilho em tempo real para especificar que uma consulta deve ser executada usando o modo de baixa latência. Além disso, os gatilhos em tempo real têm suporte apenas no modo de atualização. Por exemplo:
Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.outputMode("update")
# in PySpark, realTime trigger requires you to specify the interval.
.trigger(realTime="5 minutes")
.start()
)
Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val readStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic).load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.outputMode("update")
.trigger(RealTimeTrigger.apply())
// RealTimeTrigger can also accept an argument specifying the checkpoint interval.
// For example, this code indicates a checkpoint interval of 5 minutes:
// .trigger(RealTimeTrigger.apply("5 minutes"))
.start()
Observability
Anteriormente, a latência de consulta de ponta a ponta estava intimamente vinculada à duração do lote, tornando a duração do lote um bom indicador de latência de consulta. No entanto, esse método não se aplica mais no modo em tempo real, exigindo abordagens alternativas para medir a latência. A latência de ponta a ponta é específica da carga de trabalho e, às vezes, só pode ser medida com precisão com a lógica de negócios. Por exemplo, se o carimbo de data/hora de origem for gerado no Kafka, a latência poderá ser calculada como a diferença entre o carimbo de data/hora de saída do Kafka e o carimbo de data/hora de origem.
Você pode estimar a latência de ponta a ponta de várias maneiras com base em informações parciais coletadas durante o processo de streaming.
Usar StreamingQueryProgress
As métricas a seguir são incluídas no StreamingQueryProgress evento, que é registrado automaticamente nos logs de driver. Você também pode acessá-los através da função de retorno de chamada StreamingQueryListener de onQueryProgress().
QueryProgressEvent.json() ou toString() incluem métricas adicionais de modo em tempo real.
- Latência de processamento (processingLatencyMs). O tempo decorrido entre o momento em que a consulta em modo em tempo real lê um registro e quando ele é registrado no próximo estágio ou no processamento subsequente. Para consultas de estágio único, isso mede a mesma duração que a latência E2E. Essa métrica é relatada por tarefa.
- Latência de enfileiramento da fonte (sourceQueuingLatencyMs). O tempo decorrido entre o momento em que um registro é gravado com sucesso em um barramento de mensagens, por exemplo, o tempo de anexação do log no Kafka, e o momento em que o registro foi lido pela primeira vez pela consulta no modo em tempo real. Essa métrica é relatada por tarefa.
- Latência E2E (e2eLatencyMs). O tempo entre o momento em que o registro é gravado com sucesso em um barramento de mensagens e o momento em que o registro é gravado downstream pela consulta no modo em tempo real. Essa métrica é agregada por lote em todos os registros processados por todas as tarefas.
Por exemplo:
"rtmMetrics" : {
"processingLatencyMs" : {
"P0" : 0,
"P50" : 0,
"P90" : 0,
"P95" : 0,
"P99" : 0
},
"sourceQueuingLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 3
},
"e2eLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 4
},
Usar a API Observe em tarefas
A API de Observação ajuda a medir a latência sem iniciar outro trabalho. Se você tiver um carimbo de data/hora de origem que se aproxime do horário de chegada dos dados de origem e ele for passado antes de chegar ao coletor, ou se puder encontrar uma maneira de passar o carimbo de data/hora, poderá estimar a latência de cada lote usando a API Observe:
Python
from datetime import datetime
from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType
@udf(returnType=TimestampType())
def current_timestamp():
return datetime.now()
# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
"latency",
unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
"observedLatency",
avg(col("latency")).alias("avg"),
max(col("latency")).alias("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.
Scala
import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}
val currentTimestampUDF = udf(() => System.currentTimeMillis())
// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
"latency",
col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
name = "observedLatency",
avg(col("latency")).as("avg"),
max(col("latency")).as("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.
Neste exemplo, um carimbo de data/hora atual é registrado antes de gerar a entrada e a latência é estimada calculando a diferença entre esse carimbo de data/hora e o carimbo de data/hora de origem do registro. Os resultados são incluídos em relatórios de progresso e disponibilizados aos ouvintes. Aqui está um resultado de exemplo:
"observedMetrics" : {
"observedLatency" : {
"avg" : 63.8369765176552,
"max" : 219,
"p99" : 154,
"p50" : 49
}
}
O que tem suporte?
Environments
| Tipo de cluster | Supported |
|---|---|
| Dedicado (anteriormente: usuário único) | Yes |
| Standard (anteriormente: compartilhado) | No |
| Lakeflow Spark Declarative Pipelines Classic (Pipelines Declarativas do Lakeflow Spark Classic) | No |
| Pipelines Declarativos do Lakeflow Spark sem servidor | No |
| Serverless | No |
Languages
| Linguagem | Supported |
|---|---|
| Scala | Yes |
| Java | Yes |
| Python | Yes |
Modos de execução
| Modo de Execução | Supported |
|---|---|
| Modo de atualização | Yes |
| modo de acréscimo | No |
| Modo completo | No |
Sources
| Sources | Supported |
|---|---|
| Apache Kafka | Yes |
| AWS MSK | Yes |
| Eventhub (usando o Conector Kafka) | Yes |
| Kinesis | Sim (somente modo EFO) |
| Google Pub/Sub | No |
| Apache Pulsar | No |
Sinks
| Sinks | Supported |
|---|---|
| Apache Kafka | Yes |
| Eventhub (usando o Conector Kafka) | Yes |
| Kinesis | No |
| Google Pub/Sub | No |
| Apache Pulsar | No |
| Coletores arbitrários (usando forEachWriter) | Yes |
Operators
| Operators | Supported |
|---|---|
| Operações sem estado | |
|
Yes |
|
Yes |
| UDFs | |
|
Sim (com algumas limitações) |
|
Sim (com algumas limitações) |
| Aggregation | |
|
Yes |
|
Yes |
|
Yes |
|
Yes |
|
Yes |
| Funções de agregação | Yes |
| Windowing | |
|
Yes |
|
Yes |
|
No |
| Deduplication | |
|
Sim (o estado não está limitado) |
|
No |
| Fluxo - Junção de tabelas | |
|
Yes |
| Fluxo - Junção de fluxo | No |
| (simples)MapGroupsWithState | No |
| transformWithState | Sim (com algumas diferenças) |
| união | Sim (com algumas limitações) |
| forEach | Yes |
| forEachBatch | No |
| mapPartitions | Não (ver limitação) |
Usar transformWithState no modo em tempo real
Para criar aplicativos personalizados com estado, o Databricks oferece suporte a transformWithState, uma API no Streaming Estruturado do Apache Spark. Consulte Criar um aplicativo com estado personalizado para obter mais informações sobre a API e os snippets de código.
No entanto, há algumas diferenças entre como a API se comporta no modo em tempo real e as consultas de streaming tradicionais que aproveitam a arquitetura de microlote.
- O método no modo em tempo real
handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues)é chamado para cada linha.- O
inputRowsiterador retorna um único valor. No modo microlote, ele é chamado uma vez para cada chave e o iteradorinputRowsretorna todos os valores para uma chave no microlote. - Você deve estar ciente dessa diferença ao escrever seu código.
- O
- Não há suporte para temporizadores de evento no modo em tempo real.
- No modo em tempo real, os temporizadores são atrasados no disparo, dependendo da chegada dos dados. Caso contrário, se não houver dados, ele será disparado no final do lote de execução prolongada. Por exemplo, se um temporizador deve ser acionado às 10:00:00 e não houver nenhuma chegada de dados simultaneamente, ele não será acionado. Em vez disso, se os dados chegarem às 10:00:10, o temporizador será acionado com um atraso de 10 segundos. Ou, se nenhum dado chegar e o lote de execução prolongada estiver sendo encerrado, ele executará o cronômetro antes de encerrar o lote de execução prolongada.
Python UDFs
O Databricks dá suporte à maioria das UDFs (funções definidas pelo usuário) do Python no modo em tempo real:
| Tipo de UDF | Supported |
|---|---|
| UDF sem estado | |
|
Yes |
|
Yes |
|
Yes |
|
Yes |
|
Yes |
| UDF de agrupamento com estado (UDAF) | |
|
Yes |
|
No |
| UDF de agrupamento não com estado (UDAF) | |
|
No |
|
No |
|
No |
| Função Tabela | |
|
No |
| UC UDF | No |
Há vários pontos a serem considerados ao usar UDFs do Python no modo em tempo real:
- Para minimizar a latência, configure o tamanho do lote de seta (spark.sql.execution.arrow.maxRecordsPerBatch) como 1.
- Compensação: essa configuração otimiza a latência em detrimento da taxa de transferência. Para a maioria das cargas de trabalho, essa configuração é recomendada.
- Aumente o tamanho do lote somente se uma taxa de transferência maior for necessária para acomodar o volume de entrada, aceitando o potencial aumento na latência.
- As UDFs e as funções do Pandas não têm um bom desempenho com um tamanho de lote de seta de 1.
- Se você usar UDFs ou funções pandas, defina o tamanho do lote de seta como um valor mais alto (por exemplo, 100 ou superior).
- Observe que isso implica uma latência maior. O Databricks recomenda o uso de UDF ou função de seta, se possível.
- Devido ao problema de desempenho com pandas, transformWithState só tem suporte com a
Rowinterface.
Técnicas de otimização
| Technique | Habilitado por padrão |
|---|---|
| Acompanhamento de progresso assíncrono: move a gravação para compensar o log e confirmar o log em um thread assíncrono, reduzindo o tempo entre lotes entre dois micro-lotes. Isso pode ajudar a reduzir a latência de consultas de streaming sem estado. | No |
| Ponto de verificação de estado assíncrono: ajuda a reduzir a latência de consultas de streaming com estado iniciando o processamento do próximo microlote assim que a computação do microlote anterior for concluída, sem aguardar o ponto de verificação de estado. | No |
Limitations
Limitação de origem
Para o Kinesis, não há suporte para o modo de sondagem. Além disso, repartições frequentes podem afetar negativamente a latência.
Limitação da união
Para a União, há algumas limitações:
- Não há suporte para a auto-união:
- Kafka: Você não pode usar o mesmo objeto de quadro de dados de origem e realizar a união de quadros de dados derivados dele. Solução alternativa: use DataFrames diferentes que leem da mesma fonte.
- Kinesis: não é possível unificar quadros de dados derivados da mesma fonte Kinesis com a mesma configuração. Solução alternativa: além de usar dataframes diferentes, você pode atribuir uma opção "consumerName" diferente a cada DataFrame.
- Operadores com estado (por exemplo,
aggregate,deduplicate,transformWithState) definidos antes da União não são suportados. - Não há suporte para a união com fontes de lote.
Limitação de MapPartitions
mapPartitions em Scala e em APIs semelhantes do Python (mapInPandas, mapInArrow) pegam um iterador que percorre toda a partição de entrada e geram um iterador que percorre toda a saída com mapeamento arbitrário entre entrada e saída. Essas APIs podem causar problemas de desempenho no Modo Real-Time de Streaming bloqueando toda a saída, o que aumenta a latência. A semântica dessas APIs não suporta bem a propagação de marca d'água.
Use UDFs escalares combinados com transformar tipos de dados complexos ou filter, em vez disso, para obter funções semelhantes.
Examples
Os exemplos abaixo mostram consultas com suporte.
Consultas sem estado
Há suporte para qualquer consulta sem estado de estágio único ou múltiplo.
Origem kafka para coletor Kafka
Neste exemplo, você lê de uma fonte do Kafka e grava em um coletor do Kafka.
Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Reparticionar
Neste exemplo, você lê de uma fonte do Kafka, reparte os dados em 20 partições e grava em um coletor do Kafka.
Defina a configuração spark.sql.execution.sortBeforeRepartition do Spark para false antes de usar a repartição.
Python
# Sorting is not supported in repartition with real-time mode, so this has to be set to false to achieve low latency.
spark.conf.set("spark.sql.execution.sortBeforeRepartition", "false")
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.option("startingOffsets", "earliest")
.load()
.repartition(20)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
// Sorting is not supported in repartition with real-time mode, so this has to be set to false to achieve low latency.
spark.conf.set("spark.sql.execution.sortBeforeRepartition", "false")
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.repartition(20)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Junção de instantâneos de fluxo (somente difusão)
Neste exemplo, você lê do Kafka, une os dados a uma tabela estática e grava em um coletor Kafka. Observe que somente as uniões estáticas de fluxo que difundem a tabela estática são compatíveis, o que significa que a tabela estática deve caber na memória.
Python
from pyspark.sql.functions import broadcast, expr
# We assume the static table in the path `stateic_table_location` has a column 'lookupKey'.
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.option("startingOffsets", "earliest")
.load()
.withColumn("joinKey", expr("CAST(value AS STRING)"))
.join(
broadcast(spark.read.format("parquet").load(static_table_location)),
expr("joinKey = lookupKey")
)
.selectExpr("value AS key", "value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.functions.{broadcast, expr}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.join(broadcast(spark.read.format("parquet").load(staticTableLocation)), expr("joinKey = lookupKey"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Fonte do Kinesis para o coletor do Kafka
Neste exemplo, você lê de uma fonte do Kinesis e grava em um coletor do Kafka.
Python
query = (
spark.readStream
.format("kinesis")
.option("region", region_name)
.option("awsAccessKey", aws_access_key_id)
.option("awsSecretKey", aws_secret_access_key)
.option("consumerMode", "efo")
.option("consumerName", consumer_name)
.load()
.selectExpr("parttitionKey AS key", "CAST(data AS STRING) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kinesis")
.option("region", regionName)
.option("awsAccessKey", awsAccessKeyId)
.option("awsSecretKey", awsSecretAccessKey)
.option("consumerMode", "efo")
.option("consumerName", consumerName)
.load()
.select(
col("partitionKey").alias("key"),
col("data").cast("string").alias("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Union
Neste exemplo, você une dois DataFrames do Kafka de dois tópicos diferentes e grava em um coletor do Kafka.
Python
df1 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_1)
.load()
)
df2 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_2)
.load()
)
query = (
df1.union(df2)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val df1 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic1)
.load()
val df2 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic2)
.load()
df1.union(df2)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Consultas com estado
Deduplication
Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic)
.load()
.dropDuplicates(["timestamp", "value"])
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.dropDuplicates("timestamp", "value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Aggregation
Python
from pyspark.sql.functions import col
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic)
.load()
.groupBy(col("timestamp"), col("value"))
.count()
.selectExpr("CAST(value AS STRING) AS key", "CAST(count AS STRING) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.groupBy(col("timestamp"), col("value"))
.count()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply("5 minutes"))
.outputMode(OutputMode.Update())
.start()
União com Agregação
Neste exemplo, primeiro você une dois dataframes Kafka de dois tópicos diferentes e, em seguida, faz uma agregação. No final, você escreve no coletor do Kafka.
Python
from pyspark.sql.functions import col
df1 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_1)
.load()
)
df2 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_2)
.load()
)
query = (
df1.union(df2)
.groupBy(col("timestamp"), col("value"))
.count()
.selectExpr("CAST(value AS STRING) AS key", "CAST(count AS STRING) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val df1 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic1)
.load()
val df2 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic2)
.load()
df1.union(df2)
.groupBy(col("timestamp"), col("value"))
.count()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
TransformWithState
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
import org.apache.spark.sql.streaming.{ListState, MapState, StatefulProcessor, OutputMode, TTLConfig, TimeMode, TimerValues, ValueState}
/**
* This processor counts the number of records it has seen for each key using state variables
* with TTLs. It redundantly maintains this count with a value, list, and map state to put load
* on the state variable cleanup mechanism. (In practice, only one value state is needed to maintain
* the count for a given grouping key.)
*
* The input schema it expects is (String, Long) which represents a (key, source-timestamp) tuple.
* The source-timestamp is passed through so that we can calculate end-to-end latency. The output
* schema is (String, Long, Long), which represents a (key, count, source-timestamp) 3-tuple.
*
*/
class RTMStatefulProcessor(ttlConfig: TTLConfig)
extends StatefulProcessor[String, (String, Long), (String, Long, Long)] {
@transient private var _value: ValueState[Long] = _
@transient private var _map: MapState[Long, String] = _
@transient private var _list: ListState[String] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
// Counts the number of records this key has seen
_value = getHandle.getValueState("value", Encoders.scalaLong, ttlConfig)
_map = getHandle.getMapState("map", Encoders.scalaLong, Encoders.STRING, ttlConfig)
_list = getHandle.getListState("list", Encoders.STRING, ttlConfig)
}
override def handleInputRows(
key: String,
inputRows: Iterator[(String, Long)],
timerValues: TimerValues): Iterator[(String, Long, Long)] = {
inputRows.map { row =>
val key = row._1
val sourceTimestamp = row._2
val oldValue = _value.get()
_value.update(oldValue + 1)
_map.updateValue(oldValue, key)
_list.appendValue(key)
(key, oldValue + 1, sourceTimestamp)
}
}
}
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.select(col("key").cast("STRING"), col("value").cast("STRING"), col("timestamp"))
.as[(String, String, Timestamp)]
.groupByKey(row => row._1)
.transformWithState(new RTMStatefulProcessor(TTLConfig(Duration.ofSeconds(30))), TimeMode.ProcessingTime, OutputMode.Update)
.as[(String, Long, Long)]
.select(
col("_1").as("key"),
col("_2").as("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply("5 minutes"))
.outputMode(OutputMode.Update())
.start()
Note
Há uma diferença entre a forma como o modo em tempo real e outros modos de execução no Streaming Estruturado executam o StatefulProcessor em transformWithState. Consulte Use transformWithState no modo em tempo real
TransformWithState (PySpark, interface row)
from typing import Iterator, Tuple
from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import LongType, StringType, TimestampType, StructField, StructType
class RTMStatefulProcessor(StatefulProcessor):
"""
This processor counts the number of records it has seen for each key using state variables
with TTLs. It redundantly maintains this count with a value, list, and map state to put load
on the state variable cleanup mechanism. (In practice, only one value state is needed to maintain
the count for a given grouping key.)
The input schema it expects is (String, Long) which represents a (key, source-timestamp) tuple.
The source-timestamp is passed through so that we can calculate end-to-end latency. The output
schema is (String, Long, Long), which represents a (key, count, source-timestamp) 3-tuple.
"""
def init(self, handle: StatefulProcessorHandle) -> None:
state_schema = StructType([StructField("value", LongType(), True)])
self.value_state = handle.getValueState("value", state_schema, 30000)
map_key_schema = StructType([StructField("key", LongType(), True)])
map_value_schema = StructType([StructField("value", StringType(), True)])
self.map_state = handle.getMapState("map", map_key_schema, map_value_schema, 30000)
list_schema = StructType([StructField("value", StringType(), True)])
self.list_state = handle.getListState("list", list_schema, 30000)
def handleInputRows(self, key, rows, timerValues) -> Iterator[Row]:
for row in rows:
# row is a tuple (key, source_timestamp)
key_str = row[0]
source_timestamp = row[1]
old_value = value.get()
if old_value is None:
old_value = 0
self.value_state.update((old_value + 1,))
self.map_state.update((old_value,), (key_str,))
self.list_state.appendValue((key_str,))
yield Row(key=key_str, value=old_value + 1, timestamp=source_timestamp)
def close(self) -> None:
pass
output_schema = StructType(
[
StructField("key", StringType(), True),
StructField("value", LongType(), True),
StructField("timestamp", TimestampType(), True),
]
)
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
.groupBy("key")
.transformWithState(
statefulProcessor=RTMStatefulProcessor(),
outputStructType=output_schema,
outputMode="Update",
timeMode="processingTime",
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("Update")
.start()
)
Note
Há uma diferença entre como o modo em tempo real e outros modos de execução no Fluxo Estruturado são executados StatefulProcessor em transformWithState. Consulte Use transformWithState no modo em tempo real
Sinks
Gravar no Postgres via foreachSink
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.sql.{ForeachWriter, Row}
/**
* Groups connection properties for
* the JDBC writers.
*
* @param url JDBC url of the form jdbc:subprotocol:subname to connect to
* @param dbtable database table that should be written into
* @param username username for authentication
* @param password password for authentication
*/
class JdbcWriterConfig(
val url: String,
val dbtable: String,
val username: String,
val password: String,
) extends Serializable
/**
* Handles streaming data writes to a database sink via JDBC, by:
* - connecting to the database
* - buffering incoming data rows in batches to reduce write overhead
*
* @param config connection parameters and configuration knobs for the writer
*/
class JdbcStreamingDataWriter(config: JdbcWriterConfig)
extends ForeachWriter[Row] with Serializable {
// The writer currently only supports this hard-coded schema
private val UPSERT_STATEMENT_SQL =
s"""MERGE INTO "${config.dbtable}"
|USING (
| SELECT
| CAST(? AS INTEGER) AS "id",
| CAST(? AS CHARACTER VARYING) AS "data"
|) AS "source"
|ON "test"."id" = "source"."id"
|WHEN MATCHED THEN
| UPDATE SET "data" = "source"."data"
|WHEN NOT MATCHED THEN
| INSERT ("id", "data") VALUES ("source"."id", "source"."data")
|""".stripMargin
private val MAX_BUFFER_SIZE = 3
private val buffer = new Array[Row](MAX_BUFFER_SIZE)
private var bufferSize = 0
private var connection: Connection = _
/**
* Flushes the [[buffer]] by writing all rows in the buffer to the database.
*/
private def flushBuffer(): Unit = {
require(connection != null)
if (bufferSize == 0) {
return
}
var upsertStatement: PreparedStatement = null
try {
upsertStatement = connection.prepareStatement(UPSERT_STATEMENT_SQL)
for (i <- 0 until bufferSize) {
val row = buffer(i)
upsertStatement.setInt(1, row.getAs[String]("key"))
upsertStatement.setString(2, row.getAs[String]("value"))
upsertStatement.addBatch()
}
upsertStatement.executeBatch()
connection.commit()
bufferSize = 0
} catch { case e: Exception =>
if (connection != null) {
connection.rollback()
}
throw e
} finally {
if (upsertStatement != null) {
upsertStatement.close()
}
}
}
override def open(partitionId: Long, epochId: Long): Boolean = {
connection = DriverManager.getConnection(config.url, config.username, config.password)
true
}
override def process(row: Row): Unit = {
buffer(bufferSize) = row
bufferSize += 1
if (bufferSize >= MAX_BUFFER_SIZE) {
flushBuffer()
}
}
override def close(errorOrNull: Throwable): Unit = {
flushBuffer()
if (connection != null) {
connection.close()
connection = null
}
}
}
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic)
.load()
.writeStream
.outputMode(OutputMode.Update())
.trigger(defaultTrigger)
.foreach(new JdbcStreamingDataWriter(new JdbcWriterConfig(jdbcUrl, tableName, jdbcUsername, jdbcPassword)))
.start()
Display
Important
Esse recurso está disponível no Databricks Runtime 17.1 e posterior.
Origem da taxa de exibição
Neste exemplo, você lê de uma fonte de taxa e exibe o DataFrame de streaming em um notebook.
Python
inputDF = (
spark
.readStream
.format("rate")
.option("numPartitions", 2)
.option("rowsPerSecond", 1)
.load()
)
display(inputDF, realTime="5 minutes", outputMode="update")
Scala
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.streaming.OutputMode
val inputDF = spark
.readStream
.format("rate")
.option("numPartitions", 2)
.option("rowsPerSecond", 1)
.load()
display(inputDF, trigger=Trigger.RealTime(), outputMode=OutputMode.Update())