Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Importante
A update_flow API está em Visualização Pública.
Use o @dp.update_flow decorador para criar um fluxo de atualização. Atualizar fluxos gravam em coletores usando o modo de saída de atualização, emitindo apenas as linhas que foram alteradas em cada lote. Ao contrário dos fluxos de acréscimo, eles dão suporte a agregações com estado sem a necessidade de uma marca d'água.
Os fluxos de atualização só podem direcionar coletores. Não há suporte para tabelas delta.
Syntax
from pyspark import pipelines as dp
dp.create_sink("<sink-name>", "<format>", {"<key>": "<value>"})
@dp.update_flow(
target = "<sink-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value>", "<key>" : "<value>"}, # optional
comment = "<comment>", # optional
import_checkpoint = "<checkpoint-path>") # optional
def <function-name>():
return (<streaming-query>)
Parameters
| Parâmetro | Tipo | Description |
|---|---|---|
| função | function |
Required. Uma função que retorna um DataFrame de streaming do Apache Spark de uma consulta definida pelo usuário. |
target |
str |
Required. O nome do coletor no qual esse fluxo grava. |
name |
str |
O nome do fluxo. Se não for fornecido, o padrão será o nome da função. |
comment |
str |
Uma descrição para o fluxo. |
spark_conf |
dict |
Um ditado das configurações do Spark para a execução dessa consulta. Essas configurações substituem as configurações definidas para o destino, o pipeline ou o cluster. |
import_checkpoint |
str |
Um caminho de ponto de verificação externo a ser importado antes de iniciar o fluxo. Importado apenas uma vez, quando o diretório de ponto de verificação do fluxo ainda não existe. |
Exemplos
Agregação para um coletor Kafka
Gravar resultados de agregação com estado em um coletor Kafka:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
dp.create_sink("event_counts_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="event_counts_flow",
target="event_counts_sink",
)
def event_counts():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING) AS event_type")
.groupBy(col("event_type"))
.count()
)
Modo em tempo real
Use spark_conf para configurar um fluxo de atualização para o modo em tempo real:
from pyspark import pipelines as dp
dp.create_sink("my_kafka_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="my_rtm_flow",
target="my_kafka_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def my_real_time_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
)
Limitações
- Não há suporte para coletores de tabela delta como destinos para fluxos de atualização.