Partilhar via


Gerencie a qualidade dos dados com o Delta Live Tables

Você usa expectativas para definir restrições de qualidade de dados no conteúdo de um conjunto de dados. As expectativas permitem garantir que os dados que chegam às tabelas atendam aos requisitos de qualidade dos dados e forneçam informações sobre a qualidade dos dados para cada atualização de pipeline. Você aplica expectativas a consultas usando decoradores Python ou cláusulas de restrição SQL.

Quais são as expectativas da Delta Live Tables?

As expectativas são cláusulas opcionais que você adiciona às declarações do conjunto de dados Delta Live Tables que aplicam verificações de qualidade de dados em cada registro que passa por uma consulta.

Uma expectativa consiste em três coisas:

  • Uma descrição, que atua como um identificador exclusivo e permite rastrear métricas para a restrição.
  • Uma instrução booleana que sempre retorna true ou false com base em alguma condição declarada.
  • Uma ação a ser tomada quando um registro falha na expectativa, o que significa que o booleano retorna false.

A matriz a seguir mostra as três ações que você pode aplicar a registros inválidos:

Ação Resultado
avisar (padrão) Registros inválidos são gravados no destino; A falha é relatada como uma métrica para o conjunto de dados.
gota Registros inválidos são descartados antes que os dados sejam gravados no destino; A falha é relatada como uma métrica para o conjunto de dados.
reprovado Registros inválidos impedem que a atualização seja bem-sucedida. É necessária uma intervenção manual antes do reprocessamento.

Você pode exibir métricas de qualidade de dados, como o número de registros que violam uma expectativa, consultando o log de eventos do Delta Live Tables. Consulte Monitorar pipelines Delta Live Tables.

Para obter uma referência completa da sintaxe da declaração do conjunto de dados Delta Live Tables, consulte Referência da linguagem Python Delta Live Tables ou Referência da linguagem SQL Delta Live Tables.

Nota

  • Embora você possa incluir várias cláusulas em qualquer expectativa, apenas o Python suporta a definição de ações com base em várias expectativas. Consulte Expectativas múltiplas.
  • As expectativas devem ser definidas usando expressões SQL. Não é possível usar sintaxe não-SQL (por exemplo, funções Python) ao definir uma expectativa.

Reter registos inválidos

Use o expect operador quando quiser manter registros que violem a expectativa. Os registros que violam a expectativa são adicionados ao conjunto de dados de destino junto com registros válidos:

Python

@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")

SQL

CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')

Descartar registros inválidos

Use o expect or drop operador para evitar o processamento adicional de registros inválidos. Os registros que violam a expectativa são descartados do conjunto de dados de destino:

Python

@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")

SQL

CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW

Falha em registros inválidos

Quando registros inválidos forem inaceitáveis, use o operador para interromper a expect or fail execução imediatamente quando um registro falhar na validação. Se a operação for uma atualização de tabela, o sistema reverte atomicamente a transação:

Python

@dlt.expect_or_fail("valid_count", "count > 0")

SQL

CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE

Importante

Se você tiver vários fluxos paralelos definidos em um pipeline, a falha de um único fluxo não fará com que outros fluxos falhem.

Quando um pipeline falha devido a uma violação de expectativa, você deve corrigir o código do pipeline para manipular os dados inválidos corretamente antes de executar novamente o pipeline.

As expectativas de falha modificam o plano de consulta do Spark de suas transformações para rastrear as informações necessárias para detetar e relatar violações. Para muitas consultas, você pode usar essas informações para identificar qual registro de entrada resultou na violação. Segue-se um exemplo de exceção:

Expectation Violated:
{
  "flowName": "a-b",
  "verboseInfo": {
    "expectationsViolated": [
      "x1 is negative"
    ],
    "inputData": {
      "a": {"x1": 1,"y1": "a },
      "b": {
        "x2": 1,
        "y2": "aa"
      }
    },
    "outputRecord": {
      "x1": 1,
      "y1": "a",
      "x2": 1,
      "y2": "aa"
    },
    "missingInputData": false
  }
}

Expectativas múltiplas

Você pode definir expectativas com uma ou mais restrições de qualidade de dados em pipelines Python. Estes decoradores aceitam um dicionário Python como argumento, onde a chave é o nome da expectativa e o valor é a restrição de expectativa.

Use expect_all para especificar várias restrições de qualidade de dados quando os registros que falharem na validação devem ser incluídos no conjunto de dados de destino:

@dlt.expect_all({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

Use expect_all_or_drop para especificar várias restrições de qualidade de dados quando os registros que falharem na validação devem ser descartados do conjunto de dados de destino:

@dlt.expect_all_or_drop({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

Use expect_all_or_fail para especificar várias restrições de qualidade de dados quando os registros que falham na validação devem interromper a execução do pipeline:

@dlt.expect_all_or_fail({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

Você também pode definir uma coleção de expectativas como uma variável e passá-la para uma ou mais consultas em seu pipeline:

valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}

@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
  # Create raw dataset

@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
  # Create cleaned and prepared dataset

Colocar dados inválidos em quarentena

O exemplo a seguir usa expectativas em combinação com tabelas e exibições temporárias. Esse padrão fornece métricas para registros que passam por verificações de expectativa durante atualizações de pipeline e fornece uma maneira de processar registros válidos e inválidos por meio de diferentes caminhos downstream.

Nota

Este exemplo lê dados de exemplo incluídos nos conjuntos de dados Databricks. Como os conjuntos de dados Databricks não são suportados com um pipeline que publica no Unity Catalog, este exemplo funciona apenas com um pipeline configurado para publicar no metastore do Hive. No entanto, esse padrão também funciona com pipelines habilitados para Unity Catalog, mas você deve ler dados de locais externos. Para saber mais sobre como utilizar o Catálogo unity com Tabelas Delta Live, veja Utilizar o Catálogo Unity com os pipelines do Delta Live Tables.

import dlt
from pyspark.sql.functions import expr

rules = {}
rules["valid_website"] = "(Website IS NOT NULL)"
rules["valid_location"] = "(Location IS NOT NULL)"
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))

@dlt.table(
  name="raw_farmers_market"
)
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="farmers_market_quarantine",
  temporary=True,
  partition_cols=["is_quarantined"]
)
@dlt.expect_all(rules)
def farmers_market_quarantine():
  return (
    dlt.read("raw_farmers_market")
      .select("MarketName", "Website", "Location", "State",
              "Facebook", "Twitter", "Youtube", "Organic", "updateTime")
      .withColumn("is_quarantined", expr(quarantine_rules))
  )

@dlt.view(
  name="valid_farmers_market"
)
def get_valid_farmers_market():
  return (
    dlt.read("farmers_market_quarantine")
      .filter("is_quarantined=false")
  )

@dlt.view(
  name="invalid_farmers_market"
)
def get_invalid_farmers_market():
  return (
    dlt.read("farmers_market_quarantine")
      .filter("is_quarantined=true")
  )

Validar contagens de linhas entre tabelas

Você pode adicionar uma tabela adicional ao seu pipeline que define uma expectativa para comparar contagens de linhas entre duas exibições materializadas ou tabelas de streaming. Os resultados dessa expectativa aparecem no log de eventos e na interface do usuário do Delta Live Tables. O exemplo a seguir valida contagens de linhas iguais entre as tbla tabelas e tblb :

CREATE OR REFRESH MATERIALIZED VIEW count_verification(
  CONSTRAINT no_rows_dropped EXPECT (a_count == b_count)
) AS SELECT * FROM
  (SELECT COUNT(*) AS a_count FROM LIVE.tbla),
  (SELECT COUNT(*) AS b_count FROM LIVE.tblb)

Execute a validação avançada com as expectativas do Delta Live Tables

Você pode definir visualizações materializadas usando consultas agregadas e de junção e usar os resultados dessas consultas como parte de sua verificação de expectativas. Isso é útil se você deseja executar verificações complexas de qualidade de dados, por exemplo, garantindo que uma tabela derivada contenha todos os registros da tabela de origem ou garantindo a igualdade de uma coluna numérica entre tabelas.

O exemplo a report seguir valida que todos os registros esperados estão presentes na tabela:

CREATE MATERIALIZED VIEW report_compare_tests(
  CONSTRAINT no_missing_records EXPECT (r.key IS NOT NULL)
)
AS SELECT * FROM LIVE.validation_copy v
LEFT OUTER JOIN LIVE.report r ON v.key = r.key

O exemplo a seguir usa uma agregação para garantir a exclusividade de uma chave primária:

CREATE MATERIALIZED VIEW report_pk_tests(
  CONSTRAINT unique_pk EXPECT (num_entries = 1)
)
AS SELECT pk, count(*) as num_entries
FROM LIVE.report
GROUP BY pk

Torne as expectativas portáteis e reutilizáveis

Você pode manter regras de qualidade de dados separadamente de suas implementações de pipeline.

O Databricks recomenda armazenar as regras em uma tabela Delta com cada regra categorizada por uma tag. Você usa essa marca nas definições do conjunto de dados para determinar quais regras devem ser aplicadas.

O exemplo a seguir cria uma tabela nomeada rules para manter regras:

CREATE OR REPLACE TABLE
  rules
AS SELECT
  col1 AS name,
  col2 AS constraint,
  col3 AS tag
FROM (
  VALUES
  ("website_not_null","Website IS NOT NULL","validity"),
  ("location_not_null","Location IS NOT NULL","validity"),
  ("state_not_null","State IS NOT NULL","validity"),
  ("fresh_data","to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'","maintained"),
  ("social_media_access","NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)","maintained")
)

O exemplo Python a seguir define as expectativas de qualidade de dados com base nas regras armazenadas na rules tabela. A get_rules() função lê as regras da rules tabela e retorna um dicionário Python contendo regras correspondentes ao tag argumento passado para a função. O dicionário é aplicado nos @dlt.expect_all_*() decoradores para impor restrições de qualidade de dados. Por exemplo, todos os registros que falharem nas regras marcadas serão validity retirados da raw_farmers_market tabela:

Nota

Este exemplo lê dados de exemplo incluídos nos conjuntos de dados Databricks. Como os conjuntos de dados Databricks não são suportados com um pipeline que publica no Unity Catalog, este exemplo funciona apenas com um pipeline configurado para publicar no metastore do Hive. No entanto, esse padrão também funciona com pipelines habilitados para Unity Catalog, mas você deve ler dados de locais externos. Para saber mais sobre como utilizar o Catálogo unity com Tabelas Delta Live, veja Utilizar o Catálogo Unity com os pipelines do Delta Live Tables.

import dlt
from pyspark.sql.functions import expr, col

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  df = spark.read.table("rules")
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )

Em vez de criar uma tabela nomeada rules para manter regras, você pode criar um módulo Python para regras principais, por exemplo, em um arquivo nomeado rules_module.py na mesma pasta do bloco de anotações:

def get_rules_as_list_of_dict():
  return [
    {
      "name": "website_not_null",
      "constraint": "Website IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "location_not_null",
      "constraint": "Location IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "state_not_null",
      "constraint": "State IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "fresh_data",
      "constraint": "to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",
      "tag": "maintained"
    },
    {
      "name": "social_media_access",
      "constraint": "NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",
      "tag": "maintained"
    }
  ]

Em seguida, modifique o bloco de anotações anterior importando o módulo e alterando a get_rules() função para ler do módulo em vez da rules tabela:

import dlt
from rules_module import *
from pyspark.sql.functions import expr, col

df = spark.createDataFrame(get_rules_as_list_of_dict())

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )