Verwalten der Datenqualität mit Delta Live Tables

Sie verwenden Erwartungen, um Einschränkungen der Datenqualität für den Inhalt eines Datasets zu definieren. Mithilfe von Erwartungen können Sie sicherstellen, dass die in den Tabellen eingehenden Daten die Anforderungen an die Datenqualität erfüllen, und Sie erhalten Einblicke in die Datenqualität für jedes Pipeline-Update. Sie wenden Erwartungen auf Abfragen an, die Python-Decorators oder SQL-Einschränkungsklauseln verwenden.

Was sind die Erwartungen von Delta Live Tables?

Erwartungen sind optionale Klauseln, die Sie den Delta Live Tables-Datensatzdeklarationen hinzufügen, um Datenqualitätsprüfungen auf jeden Datensatz anzuwenden, der eine Abfrage durchläuft.

Eine Erwartung besteht aus drei Dingen:

  • Eine Beschreibung, die als eindeutiger Bezeichner fungiert und Ihnen ermöglicht, Metriken für die Einschränkung nachzuverfolgen.
  • Eine boolesche Aussage, die auf der Grundlage einer angegebenen Bedingung immer „wahr“ oder „falsch“ zurückgibt.
  • Eine Aktion, die ausgeführt werden soll, wenn ein Datensatz die Erwartung nicht erfüllt, was bedeutet, dass der boolesche Wert „falsch“ zurückgibt.

Die folgende Matrix zeigt die drei Aktionen, die Sie auf ungültige Datensätze anwenden können:

Aktion Ergebnis
Warnen (Standard) Ungültige Datensätze werden in das Ziel geschrieben. Fehler werden als Metrik für das Dataset gemeldet.
Verwerfen Ungültige Datensätze werden verworfen, bevor Daten in das Ziel geschrieben werden. Fehler werden als Metriken für das Dataset gemeldet.
fail Ungültige Datensätze verhindern, dass das Update erfolgreich ausgeführt wird. Vor der erneuten Verarbeitung ist ein manueller Eingriff erforderlich.

Sie können Datenqualitätsmetriken wie die Anzahl der Datensätze, die eine Erwartung verletzen, anzeigen, indem Sie das Ereignisprotokoll von Delta Live Tables abfragen. Weitere Informationen dazu finden Sie unter Überwachen von Delta Live Tables-Pipelines.

Eine vollständige Referenz zur Syntax der Delta Live Tables-Dataset-Deklaration finden Sie unter Delta Live Tables Python-Sprachreferenz oder Delta Live Tables SQL-Sprachreferenz.

Hinweis

Sie können zwar mehrere Klauseln in jede Erwartung aufnehmen, aber nur Python unterstützt die Definition von Aktionen auf der Grundlage mehrerer Erwartungen. Siehe Mehrere Erwartungen.

Ungültige Datensätze beibehalten

Verwenden Sie den expect-Operator, wenn Sie Datensätze beibehalten möchten, die gegen die Erwartungen verstoßen. Datensätze, die gegen die Erwartungen verstoßen, werden dem Ziel-Dataset zusammen mit gültigen Datensätzen hinzugefügt:

Python

@dlt.expect("valid timestamp", "col(“timestamp”) > '2012-01-01'")

SQL

CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')

Ungültige Datensätze ablegen

Verwenden Sie den expect or drop-Operator, um die weitere Verarbeitung ungültiger Datensätze zu verhindern. Datensätze, die die Erwartungen verletzen, werden aus dem Ziel-Dataset gelöscht:

Python

@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")

SQL

CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW

Fehler bei ungültigen Datensätzen

Wenn ungültige Datensätze inakzeptabel sind, verwenden Sie den expect or fail-Operator, um die Ausführung sofort zu stoppen, wenn ein Datensatz die Validierung nicht besteht. Wenn der Vorgang ein Tabellenupdate ist, führt das System atomisch ein Rollback der Transaktion aus:

Python

@dlt.expect_or_fail("valid_count", "count > 0")

SQL

CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE

Wenn eine Pipeline aufgrund einer Erwartungsverletzung fehlschlägt, müssen Sie den Pipelinecode korrigieren, um die ungültigen Daten ordnungsgemäß zu behandeln, bevor Sie die Pipeline erneut ausführen.

Erwartungen mit Fehlern ändern den Spark-Abfrageplan Ihrer Transformationen, um Informationen zu nachzuverfolgen, die für die Erkennung und Meldung von Verstößen erforderlich sind. Bei vielen Abfragen können Sie diese Informationen verwenden, um zu ermitteln, welcher Eingabedatensatz zu dem Verstoß führte. Im Anschluss finden Sie eine Beispielausnahme:

Expectation Violated:
{
  "flowName": "a-b",
  "verboseInfo": {
    "expectationsViolated": [
      "x1 is negative"
    ],
    "inputData": {
      "a": {"x1": 1,"y1": "a },
      "b": {
        "x2": 1,
        "y2": "aa"
      }
    },
    "outputRecord": {
      "x1": 1,
      "y1": "a",
      "x2": 1,
      "y2": "aa"
    },
    "missingInputData": false
  }
}

Mehrere Erwartungen

Sie können in Python-Pipelines Erwartungen mit einer oder mehreren Datenqualitätseinschränkungen definieren. Diese Decorators akzeptieren ein Python-Wörterbuch als Argument, wobei der Schlüssel der Erwartungsname und der Wert die Erwartungseinschränkung ist.

Verwenden Sie expect_all, um mehrere Data Quality-Einschränkungen anzugeben, wenn Datensätze, bei denen die Überprüfung fehlschlägt, in das Ziel-Dataset aufgenommen werden sollen:

@dlt.expect_all({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

Verwenden Sie expect_all_or_drop, um mehrere Data Quality-Einschränkungen anzugeben, wenn Datensätze, bei denen die Überprüfung fehlschlägt, aus dem Ziel-Dataset gelöscht werden sollen:

@dlt.expect_all_or_drop({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

Verwenden Sie expect_all_or_fail, um mehrere Data Quality-Einschränkungen anzugeben, wenn Datensätze, bei denen die Überprüfung fehlschlägt, aus dem Ziel-Dataset gelöscht werden sollen:

@dlt.expect_all_or_fail({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

Sie können auch eine Sammlung von Erwartungen als Variable definieren und an eine oder mehrere Abfragen in Ihrer Pipeline übergeben:

valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}

@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
  # Create raw dataset

@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
  # Create cleaned and prepared dataset

Ungültige Daten unter Quarantäne stellen

Im folgenden Beispiel werden Erwartungen in Kombination mit temporären Tabellen und Ansichten verwendet. Dieses Muster stellt Metriken für Datensätze bereit, die Erwartungsprüfungen während Pipeline-Updates bestehen, und bietet eine Möglichkeit, gültige und ungültige Datensätze über verschiedene nachgelagerte Pfade zu verarbeiten.

Hinweis

In diesem Beispiel werden Beispieldaten gelesen, die in den Databricks-Datasets enthalten sind. Da die Databricks-Datasets nicht mit einer Pipeline unterstützt werden, die im Unity-Katalog veröffentlicht wird, funktioniert dieses Beispiel nur mit einer Pipeline, die für die Veröffentlichung im Hive-Metaspeicher konfiguriert ist. Dieses Muster funktioniert jedoch auch mit aktivierten Pipelines im Unity-Katalog, Sie müssen jedoch Daten von externen Speicherorten lesen. Weitere Informationen zur Verwendung von Unity Catalog mit Delta Live Tables finden Sie unter Verwenden von Unity Catalog mit Ihren Delta Live Tables-Pipelines.

import dlt
from pyspark.sql.functions import expr

rules = {}
rules["valid_website"] = "(Website IS NOT NULL)"
rules["valid_location"] = "(Location IS NOT NULL)"
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))

@dlt.table(
  name="raw_farmers_market"
)
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="farmers_market_quarantine",
  temporary=True,
  partition_cols=["is_quarantined"]
)
@dlt.expect_all(rules)
def farmers_market_quarantine():
  return (
    dlt.read("raw_farmers_market")
      .select("MarketName", "Website", "Location", "State",
              "Facebook", "Twitter", "Youtube", "Organic", "updateTime")
      .withColumn("is_quarantined", expr(quarantine_rules))
  )

@dlt.view(
  name="valid_farmers_market"
)
def get_valid_farmers_market():
  return (
    dlt.read("farmers_market_quarantine")
      .filter("is_quarantined=false")
  )

@dlt.view(
  name="invalid_farmers_market"
)
def get_invalid_farmers_market():
  return (
    dlt.read("farmers_market_quarantine")
      .filter("is_quarantined=true")
  )

Tabellenübergreifendes Überprüfen der Zeilenanzahl

Sie können Ihrer Pipeline eine zusätzliche Tabelle hinzufügen, die eine Erwartung definiert, um die Zeilenanzahl zwischen zwei Live-Tabellen zu vergleichen. Die Ergebnisse dieser Erwartung werden im Ereignisprotokoll und auf der Delta Live Tables-Benutzeroberfläche angezeigt. Das folgende Beispiel validiert gleiche Zeilenzahlen zwischen den Tabellen tbla und tblb:

CREATE OR REFRESH LIVE TABLE count_verification(
  CONSTRAINT no_rows_dropped EXPECT (a_count == b_count)
) AS SELECT * FROM
  (SELECT COUNT(*) AS a_count FROM LIVE.tbla),
  (SELECT COUNT(*) AS b_count FROM LIVE.tblb)

Ausführen einer erweiterten Validierung mit Delta Live Tables-Erwartungen

Sie können Livetabellen mithilfe von Aggregat- und Verknüpfungsabfragen definieren und die Ergebnisse dieser Abfragen als Teil Ihrer Erwartungsprüfung verwenden. Dies ist nützlich, wenn Sie komplexe Datenqualitätsprüfungen durchführen möchten, z. B. um sicherzustellen, dass eine abgeleitete Tabelle alle Datensätze aus der Quelltabelle enthält, oder um die Gleichheit einer numerischen Spalte über Tabellen hinweg zu garantieren. Sie können das TEMPORARY-Schlüsselwort verwenden, um zu verhindern, dass diese Tabellen im Zielschema veröffentlicht werden.

Im folgenden Beispiel wird validiert, dass alle erwarteten Datensätze in der report-Tabelle vorhanden sind:

CREATE TEMPORARY LIVE TABLE report_compare_tests(
  CONSTRAINT no_missing_records EXPECT (r.key IS NOT NULL)
)
AS SELECT * FROM LIVE.validation_copy v
LEFT OUTER JOIN LIVE.report r ON v.key = r.key

Im folgenden Beispiel wird ein Aggregat verwendet, um die Eindeutigkeit eines Primärschlüssels sicherzustellen:

CREATE TEMPORARY LIVE TABLE report_pk_tests(
  CONSTRAINT unique_pk EXPECT (num_entries = 1)
)
AS SELECT pk, count(*) as num_entries
FROM LIVE.report
GROUP BY pk

Machen Sie Erwartungen portierbar und wiederverwendbar

Sie können Datenqualitätsregeln getrennt von Ihren Pipeline-Implementierungen verwalten.

Databricks empfiehlt, die Regeln in einer Delta-Tabelle zu speichern, wobei jede Regel nach einem Tag kategorisiert ist. Sie verwenden dieses Tag in Dataset-Definitionen, um zu bestimmen, welche Regeln angewendet werden sollen.

Im folgenden Beispiel wird eine Tabelle namens rules zur Verwaltung von Regeln erstellt:

CREATE OR REPLACE TABLE
  rules
AS SELECT
  col1 AS name,
  col2 AS constraint,
  col3 AS tag
FROM (
  VALUES
  ("website_not_null","Website IS NOT NULL","validity"),
  ("location_not_null","Location IS NOT NULL","validity"),
  ("state_not_null","State IS NOT NULL","validity"),
  ("fresh_data","to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'","maintained"),
  ("social_media_access","NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)","maintained")
)

Das folgende Python-Beispiel definiert die Erwartungen an die Datenqualität auf der Grundlage der in der rules-Tabelle gespeicherten Regeln. Die get_rules()-Funktion liest die Regeln aus der rules-Tabelle und gibt ein Python-Wörterbuch zurück, das die Regeln enthält, die dem an die Funktion übergebenen tag-Argument entsprechen. Das Wörterbuch wird in den @dlt.expect_all_*()-Decorator-Elementen angewendet, um Datenqualitätseinschränkungen zu erzwingen. Beispielsweise werden alle Datensätze, die die mit validity gekennzeichneten Regeln nicht erfüllen, aus der raw_farmers_market-Tabelle gelöscht:

Hinweis

In diesem Beispiel werden Beispieldaten gelesen, die in den Databricks-Datasets enthalten sind. Da die Databricks-Datasets nicht mit einer Pipeline unterstützt werden, die im Unity-Katalog veröffentlicht wird, funktioniert dieses Beispiel nur mit einer Pipeline, die für die Veröffentlichung im Hive-Metaspeicher konfiguriert ist. Dieses Muster funktioniert jedoch auch mit aktivierten Pipelines im Unity-Katalog, Sie müssen jedoch Daten von externen Speicherorten lesen. Weitere Informationen zur Verwendung von Unity Catalog mit Delta Live Tables finden Sie unter Verwenden von Unity Catalog mit Ihren Delta Live Tables-Pipelines.

import dlt
from pyspark.sql.functions import expr, col

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  df = spark.read.table("rules")
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )

Anstatt eine Tabelle mit dem Namen rules zum Verwalten von Regeln zu erstellen, könnten Sie ein Python-Modul für Hauptregeln erstellen, z. B. in einer Datei mit dem Namen rules_module.py im selben Ordner wie das Notebook:

def get_rules_as_list_of_dict():
  return [
    {
      "name": "website_not_null",
      "constraint": "Website IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "location_not_null",
      "constraint": "Location IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "state_not_null",
      "constraint": "State IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "fresh_data",
      "constraint": "to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",
      "tag": "maintained"
    },
    {
      "name": "social_media_access",
      "constraint": "NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",
      "tag": "maintained"
    }
  ]

Ändern Sie dann das vorangehende Notebook, indem Sie das Modul importieren und die Funktion get_rules() ändern, um aus dem Modul statt aus der Tabelle rules zu lesen:

import dlt
from rules_module import *
from pyspark.sql.functions import expr, col

df = spark.createDataFrame(get_rules_as_list_of_dict())

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )