Niveaux d’isolation et conflits d’écriture sur Azure Databricks

Le niveau d’isolation d’une table définit le degré auquel une transaction doit être isolée des modifications apportées par des opérations simultanées. Conflits d’écriture sur Azure Databricks en fonction du niveau d’isolation.

Delta Lake offre des garanties de transaction ACID entre les lectures et les écritures. Cela signifie que :

  • Plusieurs enregistreurs sur plusieurs clusters peuvent modifier simultanément une partition de table. Les enregistreurs voient une vue d’instantané cohérente de la table et les écritures se produisent dans un ordre série.
    • Les lecteurs continuent de voir une vue d’instantané cohérente de la table avec laquelle le travail Azure Databricks a commencé, même si une table est modifiée pendant le travail.

Consultez En quoi consistent les garanties ACID sur Azure Databricks ?.

Notes

Azure Databricks utilise Delta Lake pour toutes les tables par défaut. Cet article décrit le comportement de Delta Lake sur Azure Databricks.

Conflits d’écriture avec accès concurrentiel au niveau des lignes

L’accès concurrentiel au niveau des lignes réduit les conflits entre les opérations d’écriture simultanées en détectant les modifications au niveau des lignes et en résolvant automatiquement les conflits qui se produisent quand des écritures simultanées mettent à jour ou suppriment des lignes différentes dans le même fichier de données.

La concurrence au niveau des lignes est généralement disponible sur Databricks Runtime 14.2 et versions ultérieures. La concurrence au niveau des lignes est prise en charge par défaut pour les conditions suivantes :

  • Tables avec vecteurs de suppression activés et sans partitionnement.
  • Tables avec clustering liquide, sauf si vous avez désactivé les vecteurs de suppression.

Les tables avec des partitions ne prennent pas le charge l’accès concurrentiel au niveau des lignes, mais peuvent toujours éviter les conflits entre OPTIMIZE et toutes les autres opérations d’écriture quand les vecteurs de suppression sont activés. Consultez Limitations pour l’accès concurrentiel au niveau des lignes.

Pour les autres versions de Databricks Runtime, consultez Comportement d’aperçu de la concurrence au niveau des lignes (hérité).

La table suivante décrit les paires d’opérations d’écriture pouvant entrer en conflit dans chaque niveau d’isolation avec l’accès concurrentiel au niveau des lignes activé.

Remarque

Les tables avec colonnes d’identité ne prennent pas en charge les transactions simultanées. Consultez Utiliser des colonnes d’identité dans Delta Lake.

INSERT (1) UPDATE, DELETE, MERGE INTO OPTIMIZE
INSERT Conflit impossible
UPDATE, DELETE, MERGE INTO Conflit impossible avec WriteSerializable. Conflit possible dans Serializable lors de la modification de la même ligne ; consultez Limitations relatives à l’accès concurrentiel au niveau des lignes. MERGE INTO nécessite Photon pour la résolution des conflits au niveau des lignes. Conflit possible lors de la modification de la même ligne ; consultez Limitations relatives à l’accès concurrentiel au niveau des lignes.
OPTIMIZE Conflit impossible Conflit impossible Conflit impossible

Important

(1) Toutes les opérations INSERT dans les tables ci-dessus décrivent des opérations d’ajout qui ne lisent aucune donnée de la même table avant la validation. Les opérations INSERT qui contiennent des sous-requêtes lisant la même table prennent en charge la même concurrence que MERGE.

Conflits d’écriture sans accès concurrentiel au niveau des lignes

Le tableau suivant décrit les paires d’opérations d’écriture pouvant entrer en conflit dans chaque niveau d’isolation.

Les tables ne prennent pas en charge l’accès concurrentiel au niveau des lignes si elles ont des partitions définies et n’ont aucun vecteur de suppression activé. Databricks Runtime 14.2, une version ultérieure, est requis pour l’accès concurrentiel au niveau des lignes.

Remarque

Les tables avec colonnes d’identité ne prennent pas en charge les transactions simultanées. Consultez Utiliser des colonnes d’identité dans Delta Lake.

INSERT (1) UPDATE, DELETE, MERGE INTO OPTIMIZE
INSERT Conflit impossible
UPDATE, DELETE, MERGE INTO Conflit impossible avec WriteSerializable. Conflit possible dans Serializable, voir éviter les conflits avec les partitions. Conflit possible dans Serializable et WriteSerializable, voir éviter les conflits avec les partitions.
OPTIMIZE Conflit impossible Conflit impossible dans des tables avec des vecteurs de suppression activés. Conflit possible dans le cas contraire. Conflit impossible dans des tables avec des vecteurs de suppression activés. Conflit possible dans le cas contraire.

Important

(1) Toutes les opérations INSERT dans les tables ci-dessus décrivent des opérations d’ajout qui ne lisent aucune donnée de la même table avant la validation. Les opérations INSERT qui contiennent des sous-requêtes lisant la même table prennent en charge la même concurrence que MERGE.

Limitations pour l’accès concurrentiel au niveau des lignes

Certaines limitations s’appliquent à l’accès concurrentiel au niveau des lignes. Pour les opérations suivantes, la résolution des conflits suit la concurrence normale pour les conflits d’écriture sur Azure Databricks. Consultez Conflits d’écriture sans accès concurrentiel au niveau des lignes.

  • OPTIMIZE commande avec ZORDER BY.
  • Commandes avec des clauses conditionnelles complexes, y compris les suivantes :
    • Conditions sur des types de données complexes tels que des structs, des tableaux ou des mappages.
    • Conditions utilisant des expressions et des sous-requêtes non déterministes.
    • Conditions qui contiennent des sous-requêtes corrélées.
  • Pour les commandes MERGE, vous devez utilisez un prédicat explicite sur la table cible pour filtrer des lignes correspondant à la table source. Pour la résolution par fusion, le filtre est utilisé pour analyser uniquement les lignes qui peuvent avoir des conflits basés sur des conditions de filtre dans des opérations simultanées.

Remarque

La détection des conflits au niveau des lignes peut augmenter le temps d’exécution total. Dans le cas de nombreuses transactions simultanées, l’enregistreur hiérarchise la latence par rapport à la résolution des conflits et des conflits peuvent se produire.

Toutes les limitations des vecteurs de suppression s’appliquent également. Voir Limitations.

Quand Delta Lake effectue-t-il une validation sans lire la table ?

Les opérations d’ajout ou INSERT de Delta Lake ne lisent pas l’état de la table avant la validation si les conditions suivantes sont remplies :

  1. La logique est exprimée à l’aide de la logique SQL INSERT ou du mode append.
  2. La logique ne contient aucune sous-requête ou expression conditionnelle qui référence la table ciblée par l’opération d’écriture.

Comme pour d’autres validations, Delta Lake valide et résout les versions de table au moment de la validation à l’aide des métadonnées présentes dans le journal des transactions, mais aucune version de la table n’est réellement lue.

Notes

De nombreux modèles courants utilisent des opérations MERGE pour insérer des données en fonction des conditions de la table. Bien qu’il soit possible de réécrire cette logique à l’aide d’instructions INSERT, si une expression conditionnelle fait référence à une colonne de la table cible, ces déclarations ont les mêmes limitations de concurrence que les MERGE.

Niveaux d’isolation Write Serializable et Serializable

Le niveau d'isolement d'une table définit le degré auquel une transaction doit être isolée des modifications effectuées par des transactions concurrentes. Delta Lake sur Azure Databricks prend en charge deux niveaux d’isolation : Serializable et WriteSerializable.

  • Serializable: niveau d’isolation le plus élevé. Il garantit que les opérations d’écriture validées et toutes les lectures sont sérialisables. Les opérations sont autorisées tant qu’il existe une séquence série de leur exécution un à la fois qui génère le même résultat que celui observé dans la table. Pour les opérations d’écriture, la séquence en série est exactement la même que celle qui s’affiche dans l’historique de la table.

  • WriteSerializable (par défaut): niveau d’isolation plus faible que sérialisable. Elle garantit uniquement que les opérations d’écriture (autrement dit, les non-lectures) sont sérialisables. Toutefois, cette fonctionnalité est toujours plus puissante que l’isolement d' instantané . WriteSerializable est le niveau d’isolation par défaut, car il fournit un excellent équilibre entre la cohérence et la disponibilité des données pour les opérations les plus courantes.

    Dans ce mode, le contenu de la table Delta peut être différent de celui qui est attendu à partir de la séquence d’opérations détectées dans l’historique de table. Cela est dû au fait que ce mode autorise certaines paires d’écritures simultanées (par exemple, Operations X et Y) à continuer de façon à ce que le résultat soit égal à si Y a été effectué avant X (autrement dit, sérialisable), même si l’historique indique que Y a été validé après X. Pour interdire cette réorganisation, Définissez le niveau d’isolation de la table de façon à ce qu’il soit sérialisable pour provoquer l’échec de ces transactions.

Les opérations de lecture utilisent toujours l’isolement d’instantané. Le niveau d’isolement d’écriture détermine s’il est possible pour un lecteur de voir une capture instantanée d’une table, en fonction de l’historique, « jamais existé ».

Pour le niveau sérialisable, un lecteur ne voit toujours que les tables qui sont conformes à l’historique. Pour le niveau WriteSerializable, un lecteur peut voir une table qui n’existe pas dans le journal Delta.

Par exemple, envisagez d’utiliser TxN1, une suppression longue et txn2, qui insère des données supprimées par TxN1. txn2 et TxN1 sont terminés et sont enregistrés dans cet ordre dans l’historique. D’après l’historique, les données insérées dans txn2 ne doivent pas exister dans la table. Pour un niveau sérialisable, un lecteur ne voit jamais les données insérées par txn2. Toutefois, pour le niveau WriteSerializable, un lecteur peut voir les données insérées par txn2.

Pour plus d’informations sur les types d’opérations qui peuvent entrer en conflit l’une avec l’autre dans chaque niveau d’isolation et les erreurs possibles, consultez Éviter les conflits en utilisant le partitionnement et des conditions de commandes disjointes.

Définir le niveau d’isolation

Vous définissez le niveau d’isolation à l’aide de la commande ALTER TABLE.

ALTER TABLE <table-name> SET TBLPROPERTIES ('delta.isolationLevel' = <level-name>)

<level-name> est Serializable ou WriteSerializable.

Par exemple, pour modifier le niveau d’isolement par défaut WriteSerializable à Serializable , exécutez :

ALTER TABLE <table-name> SET TBLPROPERTIES ('delta.isolationLevel' = 'Serializable')

Éviter les conflits en utilisant le partitionnement et des conditions de commandes disjointes

Dans tous les cas marqués « Conflit possible », le conflit entre les deux opérations dépend du fait qu’elles opèrent ou non sur le même ensemble de fichiers. Vous pouvez rendre les deux ensembles de fichiers disjoints en partitionnant la table selon les mêmes colonnes que celles utilisées dans les conditions des opérations. Par exemple, les deux commandes UPDATE table WHERE date > '2010-01-01' ... et DELETE table WHERE date < '2010-01-01' seront en conflit si la table n’est pas partitionnée par date, car toutes deux peuvent tenter de modifier le même ensemble de fichiers. Le partitionnement de la table par date permet d’éviter le conflit. Par conséquent, le partitionnement d’une table selon les conditions couramment utilisées sur la commande peut réduire considérablement les conflits. Toutefois, le partitionnement d’une table par une colonne qui a une cardinalité élevée peut entraîner d’autres problèmes de performance en raison du grand nombre de sous-répertoires.

Exceptions de conflits

En cas de conflit de transactions, vous observerez l’une des exceptions suivantes :

ConcurrentAppendException

Cette exception se produit lorsqu’une opération simultanée ajoute des fichiers dans la même partition (ou n’importe où dans une table non partitionnée) que celle que votre opération lit. Les ajouts de fichiers peuvent être causés par des opérations INSERT, DELETE, UPDATE ou MERGE.

Avec le niveau d’isolation par défaut WriteSerializable, les fichiers ajoutés par des opérations INSERTaveugles (c’est-à-dire des opérations qui ajoutent aveuglément des données sans en lire aucune) n’entrent en conflit avec aucune opération, même si elles touchent la même partition (ou n’importe où dans une table non partitionnée). Si le niveau d’isolation est défini sur Serializable, les ajouts aveugles peuvent entrer en conflit.

Cette exception est souvent levée pendant des opérations DELETE, UPDATE ou MERGE simultanées. Bien que les opérations simultanées puissent mettre à jour physiquement des répertoires de partition différents, l’une d’entre elles peut lire la même partition que celle que l’autre met à jour simultanément, provoquant ainsi un conflit. Vous pouvez éviter cela en rendant la séparation explicite dans la condition de l’opération. Considérez l'exemple suivant.

// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
    source.as("s"),
    "s.user_id = t.user_id AND s.date = t.date AND s.country = t.country")
  .whenMatched().updateAll()
  .whenNotMatched().insertAll()
  .execute()

Supposons que vous exécutez le code ci-dessus simultanément pour des dates ou des pays différents. Comme chaque tâche travaille sur une partition indépendante sur la table Delta cible, vous ne prévoyez aucun conflit. Toutefois, la condition n’est pas assez explicite et peut analyser l’intégralité de la table et entrer en conflit avec les opérations simultanées mettant à jour d’autres partitions. Au lieu de cela, vous pouvez réécrire votre instruction pour ajouter une date et un pays spécifiques à la condition de fusion, comme illustré dans l’exemple suivant.

// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
    source.as("s"),
    "s.user_id = t.user_id AND s.date = t.date AND s.country = t.country AND t.date = '" + <date> + "' AND t.country = '" + <country> + "'")
  .whenMatched().updateAll()
  .whenNotMatched().insertAll()
  .execute()

Cette opération peut désormais être exécutée en toute sécurité à des dates et dans des pays différents.

ConcurrentDeleteReadException

Cette exception se produit lorsqu’une opération simultanée a supprimé un fichier que votre opération a lu. Les causes courantes sont une opération DELETE, UPDATE ou MERGE qui réécrit les fichiers.

ConcurrentDeleteDeleteException

Cette exception se produit lorsqu’une opération simultanée a supprimé un fichier que votre opération supprime également. Cela peut être dû au fait que deux opérations de compression simultanées réécrivent les mêmes fichiers.

MetadataChangedException

Cette exception se produit lorsqu’une transaction simultanée met à jour les métadonnées d’une table Delta. Les causes courantes sont les opérations ALTER TABLE ou les écritures dans votre table Delta qui mettent à jour le schéma de la table.

ConcurrentTransactionException

Si une requête de streaming utilisant le même emplacement de point de contrôle démarre plusieurs fois simultanément et tente d’écrire dans la table Delta au même moment. Il ne faut jamais que deux requêtes de streaming utilisent le même emplacement de point de contrôle et s’exécutent en même temps.

ProtocolChangedException

Cette exception peut se produire dans les cas suivants :

  • Lorsque votre table Delta est mise à niveau vers une nouvelle version du protocole. Pour que les futures opérations aboutissent, vous devrez peut-être mettre à niveau votre version de Databricks Runtime.
  • Lorsque plusieurs enregistreurs créent ou remplacent une table au même moment.
  • Lorsque plusieurs enregistreurs écrivent en même temps dans un chemin d’accès vide.

Pour plus d’informations, consultez Comment Azure Databricks gère-t-il la compatibilité de la fonctionnalité Delta Lake ?

Comportement d’aperçu de la concurrence au niveau des lignes (hérité)

Cette section décrit des comportements de préversion pour l’accès concurrentiel au niveau des lignes dans Databricks Runtime 14.1 et versions ultérieures. L’accès concurrentiel au niveau des lignes nécessite toujours des vecteurs de suppression.

Dans Databricks Runtime 13.3 LTS et versions ultérieures, les tables avec clustering liquide activé activent automatiquement l’accès concurrentiel au niveau des lignes.

Dans Databricks Runtime 14.0 et 14.1, vous pouvez activer l’accès concurrentiel au niveau des lignes pour des tables avec des vecteurs de suppression en définissant la configuration suivante pour le cluster ou SparkSession :

spark.databricks.delta.rowLevelConcurrencyPreview = true

Dans Databricks Runtime 14.1 et versions ultérieures, le calcul non Photon prend uniquement en charge l’accès concurrentiel au niveau des lignes pour les opérations DELETE.