Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Este artigo descreve como você pode usar pipelines para declarar transformações em conjuntos de dados e especificar como os registros são processados por meio da lógica de consulta. Ele também contém exemplos de padrões de transformação comuns para a construção de pipelines.
Você pode definir um conjunto de dados em relação a qualquer consulta que retorna um DataFrame. Você pode usar operações internas do Apache Spark, UDFs, lógica personalizada e modelos MLflow como transformações em Lakeflow Spark Declarative Pipelines. Depois de os dados serem ingeridos no seu pipeline, poderá definir novos conjuntos de dados relacionados com fontes upstream para criar novas tabelas de fluxo contínuo, vistas materializadas e vistas.
Para saber como efetuar com eficiência o processamento com estado em um pipeline, consulte Otimizar o processamento com estado utilizando marcas d'água.
Quando usar vistas, vistas materializadas e tabelas de fluxo
Ao implementar suas consultas de pipeline, escolha o melhor tipo de conjunto de dados para garantir que eles sejam eficientes e fáceis de manter.
Considere usar um modo de exibição para fazer o seguinte:
- Divida uma consulta grande ou complexa que você deseja em consultas mais fáceis de gerenciar.
- Valide resultados intermediários usando expectativas.
- Reduza os custos de armazenamento e computação para resultados que não precisa manter. Como as tabelas são materializadas, elas exigem recursos adicionais de computação e armazenamento.
Considere o uso de uma visão materializada quando:
- Várias consultas downstream consomem a tabela. Como as vistas são computadas sob demanda, a vista é recalculada toda vez que é consultada.
- Outros sistemas de processamento, trabalhos ou consultas consomem a tabela. Como as visões não são materializadas, só podem ser usadas no mesmo pipeline.
- Você deseja exibir os resultados de uma consulta durante o desenvolvimento. Como as tabelas são materializadas e podem ser visualizadas e consultadas fora do pipeline, o uso de tabelas durante o desenvolvimento pode ajudar a validar a correção dos cálculos. Após a validação, converta consultas que não exigem materialização em visualizações.
Considere o uso de uma tabela de streaming quando:
- Uma consulta é definida em relação a uma fonte de dados que está crescendo contínua ou incrementalmente.
- Os resultados da consulta devem ser calculados incrementalmente.
- O pipeline precisa de alta taxa de transferência e baixa latência.
Observação
As tabelas de streaming são sempre definidas a partir das fontes de streaming. Você também pode usar fontes de streaming com AUTO CDC ... INTO para aplicar atualizações de feeds do CDC. Consulte As APIs do AUTO CDC: Simplifique a captura de dados de alteração com pipelines.
Excluir tabelas do esquema de destino
Se você precisar calcular tabelas intermediárias não destinadas ao consumo externo, poderá impedir que elas sejam publicadas em um esquema usando a palavra-chave TEMPORARY. As tabelas temporárias ainda armazenam e processam dados de acordo com a semântica do Lakeflow Spark Declarative Pipelines, mas não devem ser acessadas fora do pipeline atual. Uma tabela temporária persiste durante o tempo de vida do pipeline que a cria. Use a sintaxe a seguir para declarar tabelas temporárias:
SQL
CREATE TEMPORARY STREAMING TABLE temp_table
AS SELECT ... ;
Python
@dp.table(
temporary=True)
def temp_table():
return ("...")
Combine tabelas de streaming e visualizações materializadas em um único pipeline
As tabelas de fluxo contínuo herdam as garantias de processamento do sistema Apache Spark Structured Streaming e são configuradas para processar consultas de fontes de dados que apenas permitem acréscimos, nas quais novas linhas são sempre inseridas na tabela de origem e não modificadas.
Observação
Embora, por predefinição, as tabelas de streaming exijam fontes de dados exclusivamente de acréscimo, quando uma fonte de streaming é outra tabela de streaming que necessite de atualizações ou exclusões, é possível alterar este comportamento utilizando a flag skipChangeCommits.
Um padrão de streaming comum envolve a ingestão de dados de origem para criar os conjuntos de dados iniciais em um pipeline. Esses conjuntos de dados iniciais são comumente chamados de tabelas de bronze e geralmente executam transformações simples.
Por outro lado, as tabelas finais num pipeline, comumente chamadas de tabelas douradas, geralmente exigem agregações complicadas ou leitura de alvos de uma operação AUTO CDC ... INTO. Como essas operações criam inerentemente atualizações em vez de acréscimos, elas não são suportadas como entradas para tabelas de streaming. Essas transformações são mais adequadas para visões materializadas.
Ao misturar tabelas de streaming e exibições materializadas em um único pipeline, você pode simplificar seu pipeline, evitar a dispendiosa reingestão ou reprocessamento de dados brutos e ter todo o poder do SQL para calcular agregações complexas em um conjunto de dados codificado e filtrado de forma eficiente. O exemplo a seguir ilustra esse tipo de processamento misto:
Observação
Estes exemplos usam o Auto Loader para carregar arquivos do armazenamento em nuvem. Para carregar arquivos com o Auto Loader num pipeline com Unity Catalog, você deve usar locais externos. Para saber mais sobre como usar o Unity Catalog com pipelines, consulte Usar o Unity Catalog com pipelines.
Python
@dp.table
def streaming_bronze():
return (
# Since this is a streaming source, this table is incremental.
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("abfss://path/to/raw/data")
)
@dp.table
def streaming_silver():
# Since we read the bronze table as a stream, this silver table is also
# updated incrementally.
return spark.readStream.table("streaming_bronze").where(...)
@dp.materialized_view
def live_gold():
# This table will be recomputed completely by reading the whole silver table
# when it is updated.
return spark.read.table("streaming_silver").groupBy("user_id").count()
SQL
CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM STREAM read_files(
"abfss://path/to/raw/data",
format => "json"
)
CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(streaming_bronze) WHERE...
CREATE OR REFRESH MATERIALIZED VIEW live_gold
AS SELECT count(*) FROM streaming_silver GROUP BY user_id
Saiba mais sobre como usar do Auto Loader para ingerir incrementalmente arquivos JSON do armazenamento do Azure.
Junções estáticas de fluxo de dados
As junções estáticas de fluxo são uma boa opção ao desnormalizar um fluxo contínuo de dados somente de acréscimo com uma tabela de dimensões principalmente estática.
Com cada atualização do pipeline, os novos registos do fluxo são unidos ao instantâneo mais atual da tabela estática. Se os registros forem adicionados ou atualizados na tabela estática depois que os dados correspondentes da tabela de streaming tiverem sido processados, os registros resultantes não serão recalculados, a menos que uma atualização completa seja executada.
Em pipelines configurados para execução acionada, a tabela estática retorna resultados a partir do momento em que a atualização foi iniciada. Em pipelines configurados para execução contínua, a versão mais recente da tabela estática é consultada sempre que a tabela processa uma atualização.
Segue-se um exemplo de uma junção estática de fluxo:
Python
@dp.table
def customer_sales():
return spark.readStream.table("sales").join(spark.read.table("customers"), ["customer_id"], "left")
SQL
CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(sales)
INNER JOIN LEFT customers USING (customer_id)
Calcule agregados de forma eficiente
Você pode usar tabelas de streaming para calcular incrementalmente agregados distributivos simples, como contagem, min, max ou soma, e agregados algébricos, como média ou desvio padrão. O Databricks recomenda a agregação incremental para consultas com um número limitado de grupos, como uma consulta com uma cláusula GROUP BY country. Apenas novos dados de entrada são lidos com cada atualização.
Para saber mais sobre como escrever consultas Lakeflow Spark Declarative Pipelines que executam agregações incrementais, consulte Executar agregações em janela com marcas d'água.
Usar modelos MLflow em Lakeflow Spark Declarative Pipelines
Observação
Para utilizar modelos MLflow num pipeline compatível com o Unity Catalog, o seu pipeline deve ser configurado para usar o canal preview. Para usar o canal current, você deve configurar seu pipeline para publicar no metastore do Hive.
Você pode usar os modelos treinados em MLflow em pipelines. Os modelos MLflow são tratados como transformações no Azure Databricks, o que significa que atuam sobre uma entrada do Spark DataFrame e retornam resultados como um Spark DataFrame. Como o Lakeflow Spark Declarative Pipelines define conjuntos de dados em relação a DataFrames, você pode converter cargas de trabalho do Apache Spark que usam MLflow em pipelines com apenas algumas linhas de código. Para obter mais informações sobre MLflow, consulte MLflow para ciclo de vida do modelo ML.
Caso já tenha um script Python chamando um modelo MLflow, pode adaptar esse código a um pipeline usando o @dp.table ou o @dp.materialized_view decorador e garantindo que as funções devolvam os resultados das transformações. O Lakeflow Spark Declarative Pipelines não instala o MLflow por padrão, portanto, confirme se você instalou as bibliotecas MLFlow com %pip install mlflow e importou mlflow e dp na parte superior da sua fonte. Para obter uma introdução à sintaxe de pipeline, consulte Desenvolver código de pipeline com Python.
Para usar modelos MLflow em pipelines, conclua as seguintes etapas:
- Obtenha a ID de execução e o nome do modelo MLflow. O ID de execução e o nome do modelo são usados para construir o URI do modelo MLflow.
- Use o URI para definir um UDF do Spark para carregar o modelo MLflow.
- Chame o UDF em suas definições de tabela para usar o modelo MLflow.
O exemplo a seguir mostra a sintaxe básica para esse padrão:
%pip install mlflow
from pyspark import pipelines as dp
import mlflow
run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
@dp.materialized_view
def model_predictions():
return spark.read.table(<input-data>)
.withColumn("prediction", loaded_model_udf(<model-features>))
Como um exemplo completo, o código a seguir define um Spark UDF chamado loaded_model_udf que carrega um modelo MLflow treinado em dados de risco de empréstimo. As colunas de dados usadas para fazer a previsão são passadas como argumento para a UDF. A tabela loan_risk_predictions calcula previsões para cada linha em loan_risk_input_data.
%pip install mlflow
from pyspark import pipelines as dp
import mlflow
from pyspark.sql.functions import struct
run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
categoricals = ["term", "home_ownership", "purpose",
"addr_state","verification_status","application_type"]
numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
"revol_util", "total_acc", "credit_length_in_years"]
features = categoricals + numerics
@dp.materialized_view(
comment="GBT ML predictions of loan risk",
table_properties={
"quality": "gold"
}
)
def loan_risk_predictions():
return spark.read.table("loan_risk_input_data")
.withColumn('predictions', loaded_model_udf(struct(features)))
Reter exclusões ou atualizações manuais
O Lakeflow Spark Declarative Pipelines permite excluir ou atualizar manualmente registros de uma tabela e fazer uma operação de atualização para recalcular tabelas downstream.
Por padrão, os pipelines recalculam os resultados da tabela com base nos dados de entrada cada vez que são atualizados, portanto, você deve garantir que o registro excluído não seja recarregado dos dados de origem. Definir a propriedade pipelines.reset.allowed table como false impede atualizações em uma tabela, mas não impede que gravações incrementais nas tabelas ou novos dados fluam para a tabela.
O diagrama a seguir ilustra um exemplo usando duas tabelas de streaming:
-
raw_user_tableingere dados brutos do usuário de uma fonte. -
bmi_tablecalcula de forma incremental os valores do IMC usando o peso e a altura deraw_user_table.
Você deseja excluir ou atualizar manualmente os registros de usuário do raw_user_table e recalcular o bmi_table.
O código a seguir demonstra a configuração da propriedade pipelines.reset.allowed da tabela para false de forma a desativar a atualização completa para raw_user_table, garantindo que as alterações pretendidas sejam mantidas ao longo do tempo, enquanto as tabelas a jusante são recalculadas quando uma atualização do pipeline é executada.
CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM STREAM read_files("/databricks-datasets/iot-stream/data-user", format => "csv");
CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(raw_user_table);