Partager via


Mettre à jour des travaux lorsque vous mettez à niveau des espaces de travail hérités vers le catalogue Unity

Lorsque vous mettez à niveau des espaces de travail hérités vers le catalogue Unity, vous devrez peut-être mettre à jour des travaux existants pour référencer des tables et des chemins de fichiers mis à niveau. Le tableau suivant répertorie les scénarios et suggestions classiques de mise à jour de vos travaux.

Scénario Solution
Le travail utilise un notebook qui a des références à des bibliothèques personnalisées via un script init ou des bibliothèques définies par cluster.
Une bibliothèque personnalisée serait définie comme un package ou un fichier jar non disponible pip publiquement qui effectue des opérations de lecture ou d’écriture Apache Spark ou SQL incorporées dans son code.
Modifiez la bibliothèque personnalisée pour vous assurer que :
  • Les noms de base de données sont des espaces de noms à trois niveaux.
  • Les points de montage ne sont pas utilisés dans le code.
Le travail utilise un bloc-notes qui lit ou écrit dans une table de metastore Hive.
  • Évaluez la définition du catalogue par défaut dans la configuration Spark du cluster de travaux : spark.databricks.sql.initial.catalog.name my_catalog
  • Évaluez si le catalogue par défaut de l’espace de travail peut être défini sur autre que hive_metastore afin que le code du travail ne soit pas obligé d’être modifié.
  • Sinon, modifiez le code du travail pour renommer des espaces de noms de deux niveaux en espaces de noms de trois niveaux de la table appropriée.
  • Si le travail utilise sql pur, envisagez d’ajouter une USE CATALOG instruction.
Le travail utilise un bloc-notes qui lit ou écrit dans des chemins d’accès qui sont des sous-dossiers de tables. Cela n’est pas possible dans le catalogue Unity.
  • Modifiez le code pour lire à partir de la table avec un prédicat sur la colonne de partition.
  • Modifiez le code pour écrire dans la table avec overwriteByPartition ou une autre option appropriée.
Le travail utilise un notebook qui lit ou écrit pour monter des chemins d’accès inscrits dans le catalogue Unity
  • Modifiez le code pour faire référence à la table d’espaces de noms à trois niveaux correcte.
  • Si la table n’est pas inscrite ou ne le sera pas, le code doit toujours être modifié pour écrire dans un chemin de volume au lieu d’un chemin de montage.
Le travail utilise un notebook qui lit ou écrit des fichiers, et non des tables, via des chemins de montage. Modifiez le code pour écrire dans un emplacement de volume à la place.
Le travail est un travail de diffusion en continu qui utilise applyInPandasWithState. N’est pas pris en charge actuellement. Envisagez la réécriture si possible ou n’essayez pas de refactoriser ce travail tant que la prise en charge n’est pas fournie.
Le travail est un travail de streaming qui utilise le mode de traitement continu. Le mode de traitement continu est expérimental dans Spark et n’est pas pris en charge dans le catalogue Unity. Refactorisez la tâche pour utiliser la diffusion en continu structurée. Si ce n’est pas possible, envisagez de conserver le travail en cours d’exécution sur le metastore Hive.
Le travail est un travail de streaming qui utilise des répertoires de point de contrôle.
  • Déplacez les répertoires de point de contrôle vers des volumes.
  • Modifiez le code dans le bloc-notes pour utiliser un chemin d’accès au volume.
  • Le propriétaire du travail doit avoir une lecture-écriture sur ce chemin.
  • Arrêtez le travail.
  • Déplacez le point de contrôle vers le nouvel emplacement du volume.
  • Redémarrez le travail.
Le travail a une définition de cluster sous Databricks Runtime 11.3.
  • Remplacez la définition du cluster de travaux par Databricks Runtime 11.3 ou version ultérieure.
  • Modifiez la définition du cluster de travaux pour utiliser un mode d’accès désigné ou standard.
Le travail comporte des notebooks qui interagissent avec le stockage ou les tables. Le principal de service sur lequel le travail s’exécutait doit être fourni avec un accès en lecture et en écriture aux ressources requises dans le catalogue Unity, telles que les volumes, les tables, les emplacements externes, etc.
Le travail est un pipeline déclaratif Lakeflow.
  • Remplacez le cluster de travaux par Databricks Runtime 13.1 ou version ultérieure.
  • Arrêtez le travail Pipelines déclaratifs Lakeflow.
  • Déplacez les données vers une table managée Du catalogue Unity.
  • Modifiez la définition du travail Pipelines déclaratifs Lakeflow pour utiliser la nouvelle table managée du catalogue Unity.
  • Redémarrez le travail Pipelines déclaratifs Lakeflow.
Le travail comporte des notebooks qui utilisent des services cloud non-stockage, tels que AWSKinesis, et la configuration utilisée pour se connecter utilise un profil d’instance.
  • Modifiez le code pour utiliser les informations d’identification du service catalogue Unity, qui régissent les informations d’identification capables d’interagir avec des services cloud non-stockage en générant des informations d’identification temporaires utilisables par les kits SDK.
Le travail utilise Scala
  • Si vous êtes inférieur à Databricks Runtime 13.3, exécutez le calcul dédié.
  • Les clusters standard sont pris en charge sur Databricks Runtime 13.3 et versions ultérieures.
Le travail comporte des notebooks qui utilisent des fonctions définies par l’utilisateur Scala
  • Si vous êtes inférieur à Databricks Runtime 13.3, exécutez le calcul dédié.
  • Les clusters standard sont pris en charge sur Databricks Runtime 14.2.
Le travail a des tâches qui utilisent MLR Exécutez sur un calcul dédié.
Le travail a une configuration de cluster qui s’appuie sur des scripts d’init globaux.
  • Utilisez Databricks Runtime 13.3 ou version ultérieure pour une prise en charge complète.
  • Modifiez pour utiliser des scripts init étendus au cluster ou utilisez des stratégies de cluster. Les scripts, les fichiers et les packages doivent être installés sur des volumes de catalogue Unity pour s’exécuter.
Le travail a une configuration de cluster ou des notebooks qui utilisent des fichiers jar/Maven, des extensions Spark ou des sources de données personnalisées (à partir de Spark).
  • Utilisez Databricks Runtime 13.3 ou version ultérieure.
  • Utilisez des stratégies de cluster pour installer des bibliothèques.
Le travail comporte des notebooks qui utilisent des fonctions définies par l’utilisateur PySpark. Utilisez Databricks Runtime 13.2 ou version ultérieure.
Le travail comporte des notebooks qui ont du code Python qui effectue des appels réseau. Utilisez Databricks Runtime 12.2 ou version ultérieure.
Le travail comporte des notebooks qui utilisent des fonctions définies par l’utilisateur Pandas (scalaire). Utilisez Databricks Runtime 13.2 ou version ultérieure.
Le travail utilise les volumes de catalogue Unity. Utilisez Databricks Runtime 13.3 ou version ultérieure.
Le travail comporte des notebooks qui utilisent spark.catalog.X (tableExists, listTables, setDefaultCatalog) et s’exécutent à l’aide d’un cluster partagé
  • Utilisez Databricks Runtime 14.2 ou version ultérieure.
  • Si la mise à niveau de Databricks Runtime n’est pas possible, procédez comme suit :
    Au lieu de tableExistscela, utilisez le code suivant :
    # SQL workaround
    def tableExistsSql(tablename):
    try:
    spark.sql(f"DESCRIBE TABLE {tablename};")
    except Exception as e:
    return False
    return True
    tableExistsSql("jakob.jakob.my_table")
    Au lieu de , utilisez SHOW TABLES (permet également de restreindre la correspondance de listTablesbase de données ou de modèle) :
    spark.sql("SHOW TABLES")
    Pour setDefaultCatalog, exécutez
    spark.sql("USE CATALOG ")
Le travail comporte des notebooks qui utilisent l’API DButils interne : contexte de commande et exécution à l’aide d’un cluster partagé.
Charges de travail qui tentent d’accéder à l’accès aux commandes, par exemple pour récupérer un ID de travail, à l’aide de
dbutils.notebook.entry_point.getDbutils().notebook().getContext().toJson()
Au lieu de .toJson(), utilisez .safeToJson(). Cela fournit un sous-ensemble de toutes les informations de contexte de commande qui peuvent être partagées en toute sécurité sur un cluster partagé.
Nécessite Databricks Runtime 13.3 LTS+
Le travail comporte des notebooks qui utilisent PySpark : spark.udf.registerJavaFunction et s’exécute à l’aide d’un cluster partagé
  • Utiliser Databricks Runtime 14.3 LTS ou version ultérieure
  • Pour les blocs-notes et les travaux, utilisez une cellule %scala pour inscrire la fonction UDF Scala à l’aide de spark.udf.register. Comme Python et Scala partagent le contexte d’exécution, la fonction UDF Scala sera également disponible à partir de Python.
  • Pour les clients qui utilisent des IDEs (à l’aide de Databricks Connect v2), la seule option consiste à réécrire la fonction UDF en tant qu’UDF python du catalogue Unity. À l’avenir, nous prévoyons d’étendre la prise en charge des fonctions définies par l’utilisateur du catalogue Unity pour prendre en charge Scala.
Le travail comporte des notebooks qui utilisent des RDD : sc.parallelize &spark.read.json() pour convertir un objet JSON en DF et s’exécute à l’aide d’un cluster partagé
  • Utiliser json.loads à la place

Exemple-
Avant :
json_content1 = "{'json_col1': 'hello', 'json_col2': 32}"
json_content2 = "{'json_col1': 'hello', 'json_col2': 'world'}"
json_list = []
json_list.append(json_content1)
json_list.append(json_content2)
df = spark.read.json(sc.parallelize(json_list))
display(df)
Après :
from pyspark.sql import Row
import json
# Sample JSON data as a list of dictionaries (similar to JSON objects)
json_data_str = response.text
json_data = [json.loads(json_data_str)]
# Convert dictionaries to Row objects
rows = [Row(**json_dict) for json_dict in json_data]
# Create DataFrame from list of Row objects
df = spark.createDataFrame(rows)
df.display()
Le travail comporte des notebooks qui utilisent des RDD : Trame de données vide via sc.emptyRDD() et s’exécute à l’aide d’un cluster partagé Exemple-
Avant :
val schema = StructType( StructField("k", StringType, true) :: StructField("v", IntegerType, false) :: Nil)
spark.createDataFrame(sc.emptyRDD[Row], schema)
Après :
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
val schema = StructType( StructField("k", StringType, true) :: StructField("v", IntegerType, false) :: Nil)
spark.createDataFrame(new java.util.ArrayList[Row](), schema)
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType([StructField("k", StringType(), True)])
spark.createDataFrame([], schema)
Le travail comporte des notebooks qui utilisent des RDD : mapPartitions (logique d’initialisation coûteuse + opérations moins coûteuses par ligne) et s’exécute à l’aide d’un cluster partagé
  • Raison-
    Clusters partagés de catalogue Unity à l’aide de Spark Connect pour la communication entre les programmes Python/Scala et le serveur Spark, ce qui rend les RDD plus accessibles.
    Avant :
    Un cas d’usage classique pour les RDD consiste à exécuter une logique d’initialisation coûteuse une seule fois, puis à effectuer des opérations moins coûteuses par ligne. Un tel cas d’usage peut appeler un service externe ou initialiser une logique de chiffrement.
    Après :
    Réécrire les opérations RDD à l’aide de l’API Dataframe et de l’utilisation des fonctions définies par l’utilisateur de flèche native PySpark.
Job a des notebooks qui utilisent SparkContext (sc) &sqlContext et s’exécute à l’aide d’un cluster partagé
  • Raison-
    Le contexte Spark (sc) et sqlContext ne sont pas disponibles par conception en raison de l’architecture de cluster partagé du catalogue Unity et de SparkConnect.
    Guide pratique pour résoudre les problèmes suivants :
    Utiliser la variable Spark pour interagir avec l’instance SparkSession
    Limites :
    La JVM Spark n’est pas accessible directement à partir de Python/Scala REPL, uniquement par le biais de commandes Spark. Cela signifie que sc._jvm commandes échouent par conception.
    Les commandes sc suivantes ne sont pas prises en charge : emptyRDD, range, init_batched_serializer, parallelize, pickleFile, textFile, wholeTextFiles, binaryFiles, binaryRecords, sequenceFile, newAPIHadoopFile, newAPIHadoopRDD, hadoopFile, hadoopRDD, union, runJob, setSystemProperty, uiWebUrl, stop, setJobGroup, setLocalProperty, getConf
Le travail comporte des notebooks qui utilisent Spark Conf - sparkContext.getConf et s’exécute à l’aide d’un cluster partagé
  • Raison-
    sparkContext, df.sparkContext, sc.sparkContext et les API similaires ne sont pas disponibles par conception.
    Guide pratique pour résoudre les problèmes suivants :
    Utiliser spark.conf à la place
Job a des notebooks qui utilisent SparkContext - SetJobDescription() et s’exécute à l’aide d’un cluster partagé
  • Raison-
    sc.setJobDescription(« String ») n’est pas disponible par conception en raison de l’architecture de cluster partagé du catalogue Unity et de SparkConnect.
    Guide pratique pour résoudre les problèmes suivants :
    Utilisez plutôt des balises si possible [Documentation PySpark]
    spark.addTag() peut attacher une balise, et getTags() et interruptTag(tag) peuvent être utilisés pour agir sur la présence/l’absence d’une balise
    Nécessite Databricks Runtime 14.1+
Le travail comporte des notebooks qui définissent les niveaux de journal Spark à l’aide de commandes, telles que sc.setLogLevel(« INFO »), et qui s’exécute à l’aide d’un cluster partagé
  • Raison-
    Dans Single-User et aucun cluster d’isolation, il est possible d’accéder au contexte Spark pour définir dynamiquement le niveau de journal entre les pilotes et les exécuteurs directement. Sur les clusters partagés, cette méthode n’a pas été accessible à partir du contexte Spark, et dans Databricks Runtime 14+ le contexte Spark n’est plus disponible.
    Guide pratique pour résoudre les problèmes suivants :
    Pour contrôler le niveau de journal sans fournir de log4j.conf, il est désormais possible d’utiliser une valeur de configuration Spark dans les paramètres du cluster. Utilisez les niveaux de journal Spark en définissant spark.log.level sur DEBUG, WARN, INFO, ERROR en tant que valeur de configuration Spark dans les paramètres du cluster.
Le travail a des notebooks qui utilisent des expressions / requêtes profondément imbriquées et s’exécute à l’aide d’un cluster partagé
  • Raison-
    RecursionError / Protobuf niveau d’imbrication maximal dépassé (pour les expressions /requêtes profondément imbriquées)
    Lorsque vous créez de façon récursive des DataFrames et des expressions profondément imbriquées à l’aide de l’API DataFrame PySpark, il est possible que l’un des cas suivants se produise dans certains cas :
    • Exception Python : RecursionError : profondeur maximale de récursivité dépassée
    • SparkConnectGprcException : Niveau d’imbrication maximal protobuf dépassé

    Guide pratique pour résoudre les problèmes suivants :
    Pour contourner le problème, identifiez les chemins de code profondément imbriqués et réécritez-les à l’aide d’expressions linéaires/ de sous-requêtes ou de vues temporaires.
    Par exemple : au lieu d’appeler récursivement df.withColumn, appelez df.withColumns(dict) à la place.
Le travail comporte des notebooks qui utilisent input_file_name() dans le code et s’exécute à l’aide d’un cluster partagé
  • Raison-
    input_file_name() n’est pas pris en charge dans le catalogue Unity pour les clusters partagés.
    Guide pratique pour résoudre les problèmes suivants :
    Pour obtenir le nom du fichier
    .withColumn("RECORD_FILE_NAME", col("_metadata.file_name"))
    fonctionnera pour spark.read
    Pour obtenir l’intégralité du chemin d’accès au fichier
    .withColumn("RECORD_FILE_PATH", col("_metadata.file_path"))
    fonctionnera pour spark.read
Le travail comporte des notebooks qui effectuent des opérations de données sur des systèmes de fichiers DBFS et s’exécutent à l’aide d’un cluster partagé
  • Raison-
    Lorsque vous utilisez DBFS avec un cluster partagé à l’aide du service FUSE, il ne peut pas atteindre le système de fichiers et génère une erreur introuvable dans le fichier
    Exemple :
    Voici quelques exemples d’échecs de l’utilisation de l’accès au cluster partagé à DBFS
    with open('/dbfs/test/sample_file.csv', 'r') as file:
    ls -ltr /dbfs/test
    cat /dbfs/test/sample_file.csv
    Guide pratique pour résoudre les problèmes suivants :
    Utilisation de l’une ou l’autre -
    • Volume catalogue Databricks Unity au lieu d’utiliser DBFS (préféré)
    • Mettez à jour le code pour utiliser dbutils ou spark qui passe par le chemin d’accès direct au stockage et est autorisé à accéder à DBFS à partir d’un cluster partagé