Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Dados são processados em pipelines por meio de fluxos. Cada fluxo consiste em uma consulta e, normalmente, um destino. O fluxo processa a consulta, seja como um lote, ou incrementalmente como um fluxo de dados para o destino. Um fluxo reside em um pipeline no Lakeflow Spark Declarative Pipelines.
Normalmente, os fluxos são definidos automaticamente quando você cria uma consulta em um pipeline que atualiza um destino, mas você também pode definir explicitamente fluxos adicionais para processamento mais complexo, como acrescentar a um único destino de várias fontes.
Atualizações
Um fluxo é executado sempre que seu pipeline de definição é atualizado. O fluxo criará ou atualizará tabelas com os dados mais recentes disponíveis. Dependendo do tipo de fluxo e do estado das alterações nos dados, a atualização pode executar uma atualização incremental, que processa apenas novos registros ou executa uma atualização completa, que reprocessa todos os registros da fonte de dados.
- Para obter mais informações sobre atualizações de pipeline, consulte Executar uma atualização de pipeline.
- Para obter mais informações sobre como programar e acionar atualizações, consulte Modo de pipeline desencadeado vs. contínuo.
Criar um fluxo padrão
Ao criar um pipeline, você normalmente define uma tabela ou uma exibição junto com a consulta que dá suporte a ela. Por exemplo, nesta consulta SQL, você cria uma tabela de streaming chamada customers_silver lendo da tabela chamada customers_bronze.
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)
Você também pode criar a mesma tabela de streaming no Python. No Python, você usa pipelines criando uma função de consulta que retorna um dataframe, com decoradores para adicionar a funcionalidade de Pipelines Declarativos do Lakeflow Spark:
from pyspark import pipelines as dp
@dp.table()
def customers_silver():
return spark.readStream.table("customers_bronze")
Neste exemplo, você criou uma tabela de streaming. Você também pode criar exibições materializadas com sintaxe semelhante no SQL e no Python. Para obter mais informações, consulte Tabelas de streaming e exibições materializadas.
Este exemplo cria um fluxo padrão junto com a tabela de streaming. O fluxo padrão de uma tabela de streaming é um fluxo de acréscimo , que adiciona novas linhas a cada gatilho. Essa é a maneira mais comum de usar pipelines: criar um fluxo e o destino em uma única etapa. Você pode usar esse estilo para ingerir dados ou transformar dados.
Os fluxos de anexação também dão suporte ao processamento que requer ler dados de várias fontes de streaming para atualizar um único alvo. Por exemplo, você pode usar a funcionalidade de acréscimo ao fluxo quando já possui uma tabela e um fluxo de streaming existentes e deseja adicionar uma nova fonte de streaming que grava nesta tabela de streaming existente.
Usando vários fluxos para gravar em um único destino
No exemplo anterior, você criou um fluxo e uma tabela de streaming em uma única etapa. Você também pode criar fluxos para uma tabela criada anteriormente. Neste exemplo, você pode ver a criação de uma tabela e o fluxo associado a ela em etapas separadas. Esse código tem resultados idênticos à criação de um fluxo padrão, incluindo o uso do mesmo nome para a tabela de streaming e o fluxo.
Python
from pyspark import pipelines as dp
# create streaming table
dp.create_streaming_table("customers_silver")
# add a flow
@dp.append_flow(
target = "customers_silver")
def customer_silver():
return spark.readStream.table("customers_bronze")
SQL
-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_silver;
-- add a flow
CREATE FLOW customers_silver
AS INSERT INTO customers_silver BY NAME
SELECT * FROM STREAM(customers_bronze);
Criar um fluxo independentemente do destino significa que você também pode criar vários fluxos que acrescentam dados ao mesmo destino.
Use o @dp.append_flow decorador na interface do Python ou a CREATE FLOW...INSERT INTO cláusula na interface SQL para criar um novo fluxo, por exemplo, para direcionar uma tabela de streaming a partir de múltiplas fontes de dados. Use o fluxo de anexação para tarefas de processamento, como as seguintes:
- Adicione fontes de streaming que acrescentam dados a uma tabela de streaming existente sem a necessidade de uma atualização completa. Por exemplo, você pode ter uma tabela combinando dados regionais de todas as regiões em que opera. À medida que novas regiões são distribuídas, você pode adicionar os novos dados de região à tabela sem executar uma atualização completa. Para obter um exemplo de como adicionar fontes de streaming à tabela de streaming existente, consulte Exemplo: Gravar em uma tabela de streaming de vários tópicos do Kafka.
- Atualize uma tabela de streaming acrescentando dados históricos ausentes (backfilling). Você pode usar a sintaxe
INSERT INTO ONCEpara criar um preenchimento retroativo histórico que é executado apenas uma vez. Por exemplo, você tem uma tabela de streaming existente que é gravada por um tópico do Apache Kafka. Você também tem dados históricos armazenados em uma tabela que precisa ser inserida exatamente uma vez na tabela de streaming e não pode transmitir os dados porque o processamento inclui a execução de uma agregação complexa antes de inserir os dados. Para obter um exemplo de um backfill, consulte Preenchimento de dados históricos com pipelines. - Combine os dados de várias fontes e escreva-os em uma única tabela de streaming em vez de usar a cláusula
UNIONem uma consulta. O uso de processamento de fluxo de acréscimo em vez deUNIONpermite que você atualize a tabela de destino incrementalmente sem executar uma atualização completa. Para obter um exemplo de uma combinação feita dessa maneira, consulte Exemplo: Utilizar o processamento de fluxo de acréscimo em vez deUNION.
O destino da saída de registros pelo processamento de fluxo de acréscimo pode ser uma tabela existente ou uma nova tabela. Para consultas python, use a função create_streaming_table() para criar uma tabela de destino.
O exemplo a seguir adiciona dois fluxos para o mesmo destino, criando uma união das duas tabelas de origem:
Python
from pyspark import pipelines as dp
# create a streaming table
dp.create_streaming_table("customers_us")
# add the first append flow
@dp.append_flow(target = "customers_us")
def append1():
return spark.readStream.table("customers_us_west")
# add the second append flow
@dp.append_flow(target = "customers_us")
def append2():
return spark.readStream.table("customers_us_east")
SQL
-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_us;
-- add the first append flow
CREATE FLOW append1
AS INSERT INTO customers_us BY NAME
SELECT * FROM STREAM(customers_us_west);
-- add the second append flow
CREATE FLOW append2
AS INSERT INTO customers_us BY NAME
SELECT * FROM STREAM(customers_us_east);
Importante
- Se você precisar definir restrições de qualidade de dados com expectativas, defina as expectativas na tabela de destino como parte da
create_streaming_table()função ou em uma definição de tabela existente. Você não pode definir expectativas na@append_flowdefinição. - Os fluxos são identificados por um nome de fluxo e esse nome é usado para identificar pontos de verificação de streaming. O uso do nome do fluxo para identificar o ponto de verificação significa o seguinte:
- Se um fluxo existente em um pipeline for renomeado, o ponto de verificação não será transferido e o fluxo renomeado será efetivamente um fluxo totalmente novo.
- Não é possível reutilizar um nome de fluxo em um pipeline, pois o ponto de verificação existente não corresponderá à nova definição de fluxo.
Tipos de fluxos
Os fluxos padrão para tabelas de streaming e visões materializadas são fluxos de adição. Você também pode criar fluxos para ler de fontes de captura de dados de alteração. A tabela a seguir descreve os diferentes tipos de fluxos.
| Tipo de fluxo | Description |
|---|---|
| Append |
Fluxos de acréscimo são o tipo mais comum de fluxo, em que novos registros na origem são gravados no destino a cada atualização. Eles correspondem ao modo de acréscimo no streaming estruturado. Você pode adicionar o ONCE marcador, indicando uma consulta em lote cujos dados devem ser inseridos no destino apenas uma vez, exceto quando o destino estiver totalmente atualizado. Qualquer número de fluxos de acréscimo pode gravar em um destino específico.Os fluxos padrão (criados com a tabela de streaming de destino ou o modo de exibição materializado) terão o mesmo nome que o destino. Outros alvos não têm fluxos padrão. |
| CDC automático (anteriormente conhecido como aplicar alterações) | Um fluxo Auto CDC ingere uma consulta que contém dados de captura de dados de alteração (CDC). Fluxos CDC automáticos só podem direcionar tabelas de streaming, e a origem deve ser uma fonte de streaming (mesmo no caso de fluxos ONCE). Vários fluxos CDC automáticos podem ser direcionados a uma única tabela de streaming. Uma tabela de streaming que atua como um destino para um fluxo de CDC automático só pode ser direcionada por outros fluxos de CDC automáticos.Para obter mais informações sobre dados CDC, consulte as APIs AUTO CDC: Simplifique a captura de dados de alteração com pipelines. |
Informações adicionais
Para obter mais informações sobre fluxos e seu uso, consulte os seguintes tópicos:
- Exemplos de fluxos em Pipelines Declarativos do Lakeflow Spark
- As APIs AUTO CDC: simplificam a captura de dados de mudanças com pipelines
- Reabastecimento de dados históricos usando pipelines
- Escrevendo pipelines em Python ou SQL
- Tabelas de streaming
- Exibições materializadas
- Destinos em Pipelines Declarativos do Lakeflow Spark