Partager via


Optimiser le traitement d'état avec des filigranes

Pour gérer efficacement les données conservées dans l’état, utilisez des filigranes lors de l’exécution d’un traitement de flux avec état dans les pipelines déclaratifs Spark Lakeflow, notamment les agrégations, les jointures et la déduplication. Cet article explique comment utiliser des filigranes dans vos requêtes de pipeline et inclut des exemples d’opérations recommandées.

Note

Pour vous assurer que les requêtes qui effectuent des agrégations sont traitées de manière incrémentielle et non entièrement recomputées avec chaque mise à jour, vous devez utiliser des filigranes.

Qu’est-ce qu’un filigrane ?

Dans le traitement de flux, un filigrane est une fonctionnalité Apache Spark qui peut définir un seuil basé sur le temps pour le traitement des données lors de l’exécution d’opérations avec état telles que des agrégations. Les données arrivant sont traitées jusqu’à ce que le seuil soit atteint, à quel moment la fenêtre de temps définie par le seuil est fermée. Les filigranes peuvent être utilisés pour éviter les problèmes lors du traitement des requêtes, principalement lors du traitement de jeux de données plus volumineux ou d’un traitement de longue durée. Ces problèmes peuvent inclure une latence élevée dans la production de résultats et même des erreurs de mémoire insuffisante (OOM) en raison de la quantité de données conservées dans l’état pendant le traitement. Étant donné que les données en flux continu sont intrinsèquement non triées, les filigranes prennent également en charge le calcul correct des opérations telles que les agrégations par fenêtres temporelles.

Pour en savoir plus sur l’utilisation de filigranes dans le traitement de flux, consultez Filigrane dans Apache Spark Structured Streaming et Appliquer des filigranes pour contrôler les seuils de traitement des données.

Comment définir un filigrane ?

Vous définissez un filigrane en spécifiant un champ d’horodatage et une valeur représentant le seuil de temps pour que les données tardives arrivent. Les données sont considérées en retard si elles arrivent après le seuil de temps défini. Par exemple, si le seuil est défini sur 10 minutes, les enregistrements arrivant après le seuil de 10 minutes peuvent être supprimés.

Étant donné que les enregistrements qui arrivent après le seuil défini peuvent être supprimés, la sélection d’un seuil qui répond à votre latence et les exigences de correction est importante. Le choix d’un seuil plus petit entraîne l’émission d’enregistrements plus tôt, mais signifie également que les enregistrements en retard sont plus susceptibles d’être supprimés. Un seuil plus élevé signifie une attente plus longue, mais peut-être plus complète des données. En raison de la taille d’état supérieure, un seuil plus élevé peut également nécessiter des ressources informatiques supplémentaires. Étant donné que la valeur de seuil dépend de vos besoins en matière de données et de traitement, le test et la surveillance de votre traitement sont importants pour déterminer un seuil optimal.

Vous utilisez la withWatermark() fonction en Python pour définir un filigrane. Dans SQL, utilisez la WATERMARK clause pour définir un filigrane :

Python

withWatermark("timestamp", "3 minutes")

SQL

WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES

Utiliser des filigranes avec des jointures entre flux

Pour les jointures de flux de données, il est nécessaire de définir un marqueur de progression sur les deux côtés de la jointure et une clause d'intervalle de temps. Étant donné que chaque source de jointure a une vue incomplète des données, la clause d’intervalle de temps est nécessaire pour indiquer au moteur de diffusion en continu quand aucune correspondance supplémentaire ne peut être effectuée. La clause d’intervalle de temps doit utiliser les mêmes champs que ceux utilisés pour définir les filigranes.

Étant donné qu’il peut arriver que chaque flux nécessite des seuils différents pour les marques de niveau d'eau, les flux n’ont pas besoin d’avoir les mêmes seuils. Pour éviter les données manquantes, le moteur de diffusion en continu conserve un filigrane global basé sur le flux le plus lent.

L’exemple suivant joint un flux d’impressions publicitaires et un flux de clics utilisateur sur les publicités. Dans cet exemple, un clic doit se produire dans les 3 minutes suivant l’impression. Une fois l’intervalle de temps de 3 minutes passé, les lignes de l’état qui ne peuvent plus être mises en correspondance sont supprimées.

Python

from pyspark import pipelines as dp

dp.create_streaming_table("adImpressionClicks")
@dp.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
  clicksDf = (read_stream("rawClicks")
    .withWatermark("clickTimestamp", "3 minutes")
  )
  impressionsDf = (read_stream("rawAdImpressions")
    .withWatermark("impressionTimestamp", "3 minutes")
  )
  joinDf = impressionsDf.alias("imp").join(
  clicksDf.alias("click"),
  expr("""
    imp.userId = click.userId AND
    clickAdId = impressionAdId AND
    clickTimestamp >= impressionTimestamp AND
    clickTimestamp <= impressionTimestamp + interval 3 minutes
  """),
  "inner"
  ).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")

  return joinDf

SQL

CREATE OR REFRESH STREAMING TABLE
  silver.adImpressionClicks
AS SELECT
  imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
  (bronze.rawAdImpressions)
WATERMARK
  impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
  (bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
  imp.userId = click.userId
AND
  clickAdId = impressionAdId
AND
  clickTimestamp >= impressionTimestamp
AND
  clickTimestamp <= impressionTimestamp + interval 3 minutes

Effectuer des agrégations par fenêtres avec des filigranes

Une opération d'état courante sur les données de streaming est une agrégation fenêtrée. Les agrégations fenêtrées sont similaires aux agrégations groupées, sauf que les valeurs d’agrégation sont retournées pour l’ensemble des lignes qui font partie de la fenêtre définie.

Une fenêtre peut être définie comme une certaine longueur et une opération d’agrégation peut être effectuée sur toutes les lignes qui font partie de cette fenêtre. Spark Streaming prend en charge trois types de fenêtres :

  • Fenêtres fixes basculantes : une série d’intervalles de temps successifs et de taille fixe, sans chevauchement. Un enregistrement d’entrée appartient à une seule fenêtre.
  • Fenêtres glissantes : comme les fenêtres bascules, les fenêtres glissantes sont de taille fixe, mais les fenêtres peuvent se chevaucher et un enregistrement peut tomber dans plusieurs fenêtres.

Lorsque les données arrivent au-delà de la fin de la fenêtre, plus la longueur du filigrane, aucune nouvelle donnée n’est acceptée pour la fenêtre, le résultat de l’agrégation est émis et l’état de la fenêtre est supprimé.

L’exemple suivant calcule une somme d’impressions toutes les 5 minutes à l’aide d’une fenêtre fixe. Dans cet exemple, la clause select utilise l’alias impressions_window, puis la fenêtre elle-même est définie dans le cadre de la GROUP BY clause. La fenêtre doit être basée sur la même colonne d’horodatage que le filigrane, la colonne clickTimestamp dans cet exemple.

CREATE OR REFRESH STREAMING TABLE
  gold.adImpressionSeconds
AS SELECT
  impressionAdId, window(clickTimestamp, "5 minutes") as impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
  (silver.adImpressionClicks)
WATERMARK
  clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
  impressionAdId, window(clickTimestamp, "5 minutes")

Exemple similaire dans Python pour calculer les bénéfices sur les fenêtres fixes horaires :

from pyspark import pipelines as dp

@dp.table()
def profit_by_hour():
  return (
    spark.readStream.table("sales")
      .withWatermark("timestamp", "1 hour")
      .groupBy(window("timestamp", "1 hour").alias("time"))
      .aggExpr("sum(profit) AS profit")
  )

Dédupliquer les enregistrements de diffusion en continu

Structured Streaming offre des garanties de traitement exactement une seule fois mais ne déduplique pas automatiquement les enregistrements provenant des sources de données. Par exemple, étant donné que de nombreuses files d’attente de messages ont au moins une fois des garanties, les enregistrements en double doivent être attendus lors de la lecture de l’une de ces files d’attente de messages. Vous pouvez utiliser la dropDuplicatesWithinWatermark() fonction pour dédupliquer les enregistrements sur n’importe quel champ spécifié, en supprimant les doublons d’un flux même si certains champs diffèrent (par exemple, l’heure de l’événement ou l’heure d’arrivée). Vous devez spécifier un filigrane pour utiliser la dropDuplicatesWithinWatermark() fonction. Toutes les données en double qui arrivent dans l’intervalle de temps spécifié par le filigrane sont supprimées.

Les données ordonnées sont importantes, car les données non ordonnées provoquent une avance incorrecte de la valeur de filigrane. Ensuite, lorsque des données plus anciennes arrivent, elles sont considérées comme tardives et supprimées. Utilisez l’option withEventTimeOrder pour traiter l’instantané initial dans l’ordre en fonction de l’horodatage spécifié dans le filigrane. L’option withEventTimeOrder peut être déclarée dans le code définissant le jeu de données ou dans les paramètres de pipeline à l’aide spark.databricks.delta.withEventTimeOrder.enabledde . Par exemple:

{
  "spark_conf": {
    "spark.databricks.delta.withEventTimeOrder.enabled": "true"
  }
}

Note

L’option withEventTimeOrder est prise en charge uniquement avec Python.

Dans l’exemple suivant, les données sont traitées par ordre clickTimestampet les enregistrements arrivant dans les 5 secondes entre eux qui contiennent des doublons userId et clickAdId des colonnes sont supprimés.

clicksDedupDf = (
  spark.readStream.table
    .option("withEventTimeOrder", "true")
    .table("rawClicks")
    .withWatermark("clickTimestamp", "5 seconds")
    .dropDuplicatesWithinWatermark(["userId", "clickAdId"]))

Optimiser la configuration du pipeline pour le traitement avec état

Pour éviter les problèmes de production et une latence excessive, Databricks recommande d’activer la gestion d’état basée sur RocksDB pour votre traitement de flux avec état, en particulier si votre traitement nécessite d’économiser une grande quantité d’état intermédiaire.

Les pipelines serverless gèrent automatiquement les configurations du magasin d’états.

Vous pouvez activer la gestion de l’état basé sur RocksDB en définissant la configuration suivante avant de déployer un pipeline :

{
  "configuration": {
    "spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
  }
}

Pour en savoir plus sur le magasin d’états RocksDB, notamment les recommandations de configuration pour RocksDB, consultez Configurer le magasin d’état RocksDB sur Azure Databricks.