Compartir a través de


Databricks Connect para Databricks Runtime 12.2 LTS y versiones anteriores

Nota:

Databricks Connect recomienda usar Databricks Connect para Databricks Runtime 13.0 y versiones posteriores en su lugar.

Databricks no planea ningún trabajo de nuevas características en Databricks Connect para Databricks Runtime 12.2 LTS y versiones anteriores.

Databricks Connect permite conectar a clústeres de Azure Databricks entornos de desarrollo integrado populares, como Visual Studio Code y PyCharm, servidores de cuadernos y otras aplicaciones personalizadas.

En este artículo, se explica el funcionamiento de Databricks Connect, los pasos para empezar a utilizarlo, cómo solucionar los problemas que puedan surgir al usarlo y las diferencias entre la ejecución con Databricks Connect y la ejecución en un cuaderno de Azure Databricks.

Información general

Databricks Connect es una biblioteca cliente para Databricks Runtime. Permite escribir trabajos mediante las API de Spark y ejecutarlos de manera remota en un clúster de Azure Databricks en lugar de hacerlo en la sesión local de Spark.

Por ejemplo, al ejecutar el comando DataFrame spark.read.format(...).load(...).groupBy(...).agg(...).show() mediante Databricks Connect, la representación lógica del comando se envía al servidor Spark que se ejecuta en Azure Databricks para su ejecución en el clúster remoto.

Con Databricks Connect, puede:

  • Ejecutar trabajos de Spark a gran escala desde cualquier aplicación de Python, R, Scale o Java. En cualquier lugar donde pueda ejecutar import pyspark, require(SparkR) o import org.apache.spark, ahora puede ejecutar trabajos de Spark directamente desde la aplicación sin necesidad de instalar ningún complemento de IDE ni utilizar scripts de envío de Spark.
  • Ejecutar paso a paso y depurar código en el IDE incluso si se trabaja con un clúster remoto.
  • Iterar rápidamente al desarrollar bibliotecas. No es necesario reiniciar el clúster después de cambiar las dependencias de biblioteca de Python o Java en Databricks Connect, porque las sesiones de cliente están aisladas entre sí en el clúster.
  • Apagar los clústeres inactivos sin perder el trabajo realizado. Como la aplicación cliente está desacoplada del clúster, no se ve afectada al reiniciar o actualizar el clúster, lo que normalmente provocaría la pérdida de todas las variables, RDD y objetos DataFrame definidos en un cuaderno.

Nota:

Para el desarrollo de Python con consultas SQL, Databricks recomienda utilizar Databricks SQL Connector para Python en lugar de Databricks Connect. Este conector es más fácil de configurar que Databricks Connect. Además, Databricks Connect analiza y planea las ejecuciones de trabajos en la máquina local, mientras que los trabajos se ejecutan en recursos de proceso remotos. Esto puede dificultar especialmente la depuración de errores en tiempo de ejecución. Databricks SQL Connector para Python envía consultas SQL directamente a recursos de proceso remotos y captura los resultados.

Requisitos

En esta sección se enumeran los requisitos de Databricks Connect.

  • Solo se admiten las versiones de Databricks Runtime siguientes:

    • Databricks Runtime 12.2 LTS ML, Databricks Runtime 12.2 LTS
    • Databricks Runtime 11.3 LTS ML, Databricks Runtime 11.3 LTS
    • Databricks Runtime 10.4 LTS ML, Databricks Runtime 10.4 LTS
    • Databricks Runtime 9.1 LTS ML, Databricks Runtime 9.1 LTS
    • Databricks Runtime 7.3 LTS
  • Debe instalar Python 3 en la máquina de desarrollo y la versión secundaria de la instalación de Python del cliente debe ser la misma que la versión secundaria de Python del clúster de Azure Databricks. En la tabla siguiente se muestra la versión de Python instalada con cada versión de Databricks Runtime.

    Versión de Databricks Runtime Versión de Python
    12.2 LTS ML, 12.2 LTS 3.9
    11.3 LTS ML, 11.3 LTS 3.9
    10.4 LTS ML, 10.4 LTS 3.8
    9.1 LTS ML, 9.1 LTS 3.8
    7.3 LTS 3.7

    Databricks recomienda que tenga un entorno virtual de Python activado para cada versión de Python que use con Databricks Connect. Los entornos virtuales de Python ayudan a garantizar que usa las versiones correctas de Python y Databricks Connect juntas. Esto puede ayudar a reducir el tiempo que se tarda en resolver problemas técnicos relacionados.

    Por ejemplo, si usa venv en la máquina de desarrollo y el clúster ejecuta Python 3.9, debe crear un venv entorno con esa versión. El siguiente comando de ejemplo genera los scripts para activar un venv entorno con Python 3.9 y, a continuación, este comando coloca esos scripts en una carpeta oculta denominada .venv dentro del directorio de trabajo actual:

    # Linux and macOS
    python3.9 -m venv ./.venv
    
    # Windows
    python3.9 -m venv .\.venv
    

    Para activar este venv entorno con estos scripts, consulte Funcionamiento de venvs.

    Otro ejemplo: si usa Conda en la máquina de desarrollo y el clúster ejecuta Python 3.9, debe crear un entorno de Conda con esa versión; por ejemplo:

    conda create --name dbconnect python=3.9
    

    Para activar el entorno de Conda con este nombre de entorno, ejecute conda activate dbconnect.

  • La versión de los paquetes principal y secundario de Databricks Connect de siempre debe coincidir con la versión de Databricks Runtime. Databricks recomienda usar siempre el paquete más reciente de Databricks Connect que coincida con la versión de Databricks Runtime. Por ejemplo, si usa un clúster de Databricks Runtime 12.2, debe usar el paquete databricks-connect==12.2.*.

    Nota:

    Consulte las notas de la versión de Databricks Connect para una lista de las actualizaciones de mantenimiento y las versiones disponibles de Databricks Connect.

  • Java Runtime Environment (JRE) 8. El cliente se probó con el entorno OpenJDK 8 JRE. El cliente no admite Java 11.

Nota:

En Windows, si ve un error que indica que Databricks Connect no encuentra winutils.exe, consulte No se encuentra winutils.exe en Windows.

Configurar el cliente

Complete los pasos siguientes para configurar el cliente local de Databricks Connect.

Nota:

Antes de empezar a configurar el cliente de Databricks Connect, debe cumplir con los requisitos de este.

Paso 1: Instale el cliente de Databricks Connect

  1. Con el entorno virtual activado, desinstale PySpark, si ya está instalado, ejecutando el comando uninstall. Esto es necesario porque el paquete databricks-connect entra en conflicto con PySpark. Para información detallada, consulte Instalaciones de PySpark en conflicto. Para comprobar si PySpark ya está instalado, ejecute el comando show.

    # Is PySpark already installed?
    pip3 show pyspark
    
    # Uninstall PySpark
    pip3 uninstall pyspark
    
  2. Con el entorno virtual aún activado, instale el cliente de Databricks Connect mediante la ejecución del comando install. Use la opción --upgrade para actualizar cualquier instalación de cliente existente a la versión especificada.

    pip3 install --upgrade "databricks-connect==12.2.*"  # Or X.Y.* to match your cluster version.
    

    Nota:

    Databricks recomienda anexar la notación "dot-asterisk" para especificar databricks-connect==X.Y.* en lugar de databricks-connect=X.Y, para asegurarse de que está instalado el paquete más reciente.

Paso 2: Configuración de las propiedades de conexión

  1. Recopile las propiedades de configuración siguientes.

  2. Configure la conexión de la manera siguiente.

    Puede usar la CLI, configuraciones de SQL o variables de entorno. La prioridad de los métodos de configuración es, de mayor a menor: claves de configuración de SQL, la CLI y variables de entorno.

    • CLI

      1. Ejecute databricks-connect.

        databricks-connect configure
        

        La licencia muestra:

        Copyright (2018) Databricks, Inc.
        
        This library (the "Software") may not be used except in connection with the
        Licensee's use of the Databricks Platform Services pursuant to an Agreement
          ...
        
      2. Acepte la licencia y suministre los valores de configuración. En Host de Databricks y Token de Databricks, escriba la dirección URL del área de trabajo y el token de acceso personal que anotó en el paso 1.

        Do you accept the above agreement? [y/N] y
        Set new config values (leave input empty to accept default):
        Databricks Host [no current value, must start with https://]: <databricks-url>
        Databricks Token [no current value]: <databricks-token>
        Cluster ID (e.g., 0921-001415-jelly628) [no current value]: <cluster-id>
        Org ID (Azure-only, see ?o=orgId in URL) [0]: <org-id>
        Port [15001]: <port>
        

        Si recibe un mensaje que indica que el token de Azure Microsoft Entra ID es demasiado largo, puede dejar vacío el campo Token de Databricks y escribir manualmente el token en ~/.databricks-connect.

    • Configuraciones de SQL o variables de entorno. En la tabla siguiente, se muestran las claves de configuración de SQL y las variables de entorno que corresponden a las propiedades de configuración que anotó en el paso 1. Para establecer una clave de configuración de SQL, utilice sql("set config=value"). Por ejemplo: sql("set spark.databricks.service.clusterId=0304-201045-abcdefgh").

      Parámetro Clave de configuración de SQL Nombre de la variable de entorno
      Host de Databricks spark.databricks.service.address DATABRICKS_ADDRESS
      Token de Databricks spark.databricks.service.token DATABRICKS_API_TOKEN
      Identificador de clúster spark.databricks.service.clusterId DATABRICKS_CLUSTER_ID
      Identificador de la organización spark.databricks.service.orgId DATABRICKS_ORG_ID
      Port spark.databricks.service.port DATABRICKS_PORT
  3. Con el entorno virtual aún activado, pruebe la conectividad con Azure Databricks como se indica a continuación.

    databricks-connect test
    

    Si el clúster que configuró no está en ejecución, la prueba inicia el clúster, que se ejecutará hasta la hora de finalización automática configurada. La salida debe tener una apariencia similar a la siguiente:

    * PySpark is installed at /.../.../pyspark
    * Checking java version
    java version "1.8..."
    Java(TM) SE Runtime Environment (build 1.8...)
    Java HotSpot(TM) 64-Bit Server VM (build 25..., mixed mode)
    * Testing scala command
    ../../.. ..:..:.. WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    ../../.. ..:..:.. WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set.
    ../../.. ..:..:.. WARN SparkServiceRPCClient: Now tracking server state for 5ab..., invalidating prev state
    ../../.. ..:..:.. WARN SparkServiceRPCClient: Syncing 129 files (176036 bytes) took 3003 ms
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_\   version 2...
          /_/
    
    Using Scala version 2.... (Java HotSpot(TM) 64-Bit Server VM, Java 1.8...)
    Type in expressions to have them evaluated.
    Type :help for more information.
    
    scala> spark.range(100).reduce(_ + _)
    Spark context Web UI available at https://...
    Spark context available as 'sc' (master = local[*], app id = local-...).
    Spark session available as 'spark'.
    View job details at <databricks-url>/?o=0#/setting/clusters/<cluster-id>/sparkUi
    View job details at <databricks-url>?o=0#/setting/clusters/<cluster-id>/sparkUi
    res0: Long = 4950
    
    scala> :quit
    
    * Testing python command
    ../../.. ..:..:.. WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    ../../.. ..:..:.. WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set.
    ../../.. ..:..:.. WARN SparkServiceRPCClient: Now tracking server state for 5ab.., invalidating prev state
    View job details at <databricks-url>/?o=0#/setting/clusters/<cluster-id>/sparkUi
    
  4. Si no se muestran errores relacionados con la conexión (los mensajes WARN están bien), se ha conectado correctamente.

Uso de Databricks Connect

En la sección, se describe cómo configurar el IDE o el servidor de cuadernos que prefiera para utilizar el cliente de Databricks Connect.

En esta sección:

JupyterLab

Nota:

Antes de empezar a usar Databricks Connect, debe cumplir con los requisitos y configurar el cliente de Databricks Connect.

Si quiere usar Databricks Connect con JupyterLab y Python, siga estas instrucciones.

  1. Para instalar JupyterLab, con el entorno virtual de Python activado, ejecute el siguiente comando desde el terminal o el símbolo del sistema:

    pip3 install jupyterlab
    
  2. Para iniciar JupyterLab en el explorador web, ejecute el siguiente comando desde el entorno virtual de Python activado:

    jupyter lab
    

    Si JupyterLab no aparece en el explorador web, copie la dirección URL que comienza por localhost o 127.0.0.1 desde su entorno virtual y escríbala en la barra de direcciones del explorador web.

  3. Crear un cuaderno nuevo: en el menú principal de JupyterLab, haga clic en Archivo > Nuevo > Cuaderno , seleccione Python 3 (ipykernel) y haga clic en Seleccionar.

  4. En la primera celda del cuaderno, escriba el código de ejemplo o su propio código. Si usa su propio código, como mínimo debe crear una instancia de SparkSession.builder.getOrCreate(), como se muestra en el código de ejemplo.

  5. Para ejecutar el cuaderno, haga clic en Run > Run All Cells.

  6. Para depurar el cuaderno, haga clic en el icono de error (Enable Debugger) situado junto a Python 3 (ipykernel) en la barra de herramientas del cuaderno. Establezca uno o varios puntos de interrupción y, a continuación, haga clic en Run > Run All Cells.

  7. Para apagar JupyterLab, haga clic en File > Shut Down. Si el proceso de JupyterLab todavía se está ejecutando en el terminal o en el símbolo del sistema, detenga este proceso presionando Ctrl + c y después escribiendo y para confirmar.

Para obtener instrucciones de depuración más específicas, consulte Depurador.

Jupyter Notebook clásico

Nota:

Antes de empezar a usar Databricks Connect, debe cumplir con los requisitos y configurar el cliente de Databricks Connect.

El script de configuración de Databricks Connect agrega el paquete a la configuración del proyecto automáticamente. Para empezar a trabajar en un kernel de Python, ejecute:

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

Si desea habilitar la abreviatura %sql para ejecutar y visualizar consultas SQL, utilice el fragmento de código siguiente:

from IPython.core.magic import line_magic, line_cell_magic, Magics, magics_class

@magics_class
class DatabricksConnectMagics(Magics):

   @line_cell_magic
   def sql(self, line, cell=None):
       if cell and line:
           raise ValueError("Line must be empty for cell magic", line)
       try:
           from autovizwidget.widget.utils import display_dataframe
       except ImportError:
           print("Please run `pip install autovizwidget` to enable the visualization widget.")
           display_dataframe = lambda x: x
       return display_dataframe(self.get_spark().sql(cell or line).toPandas())

   def get_spark(self):
       user_ns = get_ipython().user_ns
       if "spark" in user_ns:
           return user_ns["spark"]
       else:
           from pyspark.sql import SparkSession
           user_ns["spark"] = SparkSession.builder.getOrCreate()
           return user_ns["spark"]

ip = get_ipython()
ip.register_magics(DatabricksConnectMagics)

Visual Studio Code

Nota:

Antes de empezar a usar Databricks Connect, debe cumplir con los requisitos y configurar el cliente de Databricks Connect.

Para usar Databricks Connect con Visual Studio Code, haga lo siguiente:

  1. Compruebe que la extensión de Python está instalada.

  2. Abra la paleta de comandos (Comando+Mayús+P en macOS y Ctrl+Mayús+P en Windows/Linux).

  3. Seleccione un intérprete de Python. Vaya a Code (Código) > Preferences (Preferencias) > Settings (Configuración) y elija python settings (configuración de Python).

  4. Ejecute databricks-connect get-jar-dir.

  5. Agregue el directorio que el comando devolvió al JSON de configuración de usuario en python.venvPath. Esto se debe agregar a la configuración de Python.

  6. Deshabilite el linter. Haga clic en los puntos suspensivos que se encuentran al lado derecho y edite la configuración de JSON. La configuración modificada es la siguiente:

    Configuración de VS Code

  7. Si utiliza un entorno virtual, que es el método recomendado de desarrollo para Python en VS Code, escriba select python interpreter en la paleta de comandos y apunte al entorno que coincide con la versión de Python del clúster.

    Seleccionar el intérprete de Python

    Por ejemplo, si el clúster es Python 3.9, el entorno de desarrollo debe ser Python 3.9.

    Versión de Python

PyCharm

Nota:

Antes de empezar a usar Databricks Connect, debe cumplir con los requisitos y configurar el cliente de Databricks Connect.

El script de configuración de Databricks Connect agrega el paquete a la configuración del proyecto automáticamente.

Clústeres de Python 3

  1. Al crear un proyecto de PyCharm, seleccione Existing Interpreter (Intérprete existente). En el menú desplegable, seleccione el entorno de Conda que creó (consulte Requisitos).

    Seleccionar intérprete

  2. Vaya a Ejecutar > Editar configuraciones.

  3. Agregue PYSPARK_PYTHON=python3 como una variable de entorno.

    Configuración de clúster Python 3

SparkR y RStudio Desktop

Nota:

Antes de empezar a usar Databricks Connect, debe cumplir con los requisitos y configurar el cliente de Databricks Connect.

Para usar Databricks Connect con SparkR y RStudio Desktop, haga lo siguiente:

  1. Descargue y desempaquete la distribución del código abierto de Spark en la máquina de desarrollo. Elija la misma versión que en el clúster de Azure Databricks (Hadoop 2.7).

  2. Ejecute databricks-connect get-jar-dir. Este comando devuelve una ruta de acceso como /usr/local/lib/python3.5/dist-packages/pyspark/jars. Copie la ruta de acceso de archivo de un directorio sobre la ruta de acceso de archivo de directorio JAR, como /usr/local/lib/python3.5/dist-packages/pyspark, que es el directorio SPARK_HOME.

  3. Configure la ruta de acceso de la biblioteca de Spark y la página principal de Spark agregándolos a la parte superior del script de R. Establezca <spark-lib-path> en el directorio en el que desempaquetó el paquete de Spark de código abierto en el paso 1. Establezca <spark-home-path> en el directorio de Databricks Connect del paso 2.

    # Point to the OSS package path, e.g., /path/to/.../spark-2.4.0-bin-hadoop2.7
    library(SparkR, lib.loc = .libPaths(c(file.path('<spark-lib-path>', 'R', 'lib'), .libPaths())))
    
    # Point to the Databricks Connect PySpark installation, e.g., /path/to/.../pyspark
    Sys.setenv(SPARK_HOME = "<spark-home-path>")
    
  4. Inicie una sesión de Spark y empiece a ejecutar comandos de SparkR.

    sparkR.session()
    
    df <- as.DataFrame(faithful)
    head(df)
    
    df1 <- dapply(df, function(x) { x }, schema(df))
    collect(df1)
    

sparklyr y RStudio Desktop

Nota:

Antes de empezar a usar Databricks Connect, debe cumplir con los requisitos y configurar el cliente de Databricks Connect.

Importante

Esta característica está en versión preliminar pública.

Puede copiar código dependiente de sparklyr que haya desarrollado localmente mediante Databricks Connect y ejecutarlo en un cuaderno de Azure Databricks o en una instancia de RStudio Server hospedada en el área de trabajo de Azure Databricks con cambios de código mínimos o ningún cambio.

En esta sección:

Requisitos

  • sparklyr 1.2 o posterior.
  • Databricks Runtime 7.3 LTS o versiones posteriores con la versión coincidente de Databricks Connect.

Instalación, configuración y uso de sparklyr

  1. En RStudio Desktop, instale sparklyr 1.2 o posterior desde CRAN o instale la versión maestra más reciente desde GitHub.

    # Install from CRAN
    install.packages("sparklyr")
    
    # Or install the latest master version from GitHub
    install.packages("devtools")
    devtools::install_github("sparklyr/sparklyr")
    
  2. Active el entorno de Python con la versión correcta de Databricks Connect instalado y ejecute el comando siguiente en el terminal para obtener <spark-home-path>:

    databricks-connect get-spark-home
    
  3. Inicie una sesión de Spark y empiece a ejecutar comandos de sparklyr.

    library(sparklyr)
    sc <- spark_connect(method = "databricks", spark_home = "<spark-home-path>")
    
    iris_tbl <- copy_to(sc, iris, overwrite = TRUE)
    
    library(dplyr)
    src_tbls(sc)
    
    iris_tbl %>% count
    
  4. Cerrar la conexión.

    spark_disconnect(sc)
    

Recursos

Para más información, consulte LÉAME de GitHub sobre sparklyr.

Para ejemplos de código, consulte sparklyr.

Limitaciones de sparklyr y RStudio Desktop

No se admiten las características siguientes:

  • API de streaming de sparklyr
  • API de ML de sparklyr
  • API de broom
  • Modo de serialización de csv_file
  • spark-submit

IntelliJ (Scala o Java)

Nota:

Antes de empezar a usar Databricks Connect, debe cumplir con los requisitos y configurar el cliente de Databricks Connect.

Para usar Databricks Connect con IntelliJ (Scala o Java), haga lo siguiente:

  1. Ejecute databricks-connect get-jar-dir.

  2. Apunte las dependencias al directorio que el comando devolvió. Vaya a File (Archivo) > Project Structure (Estructura del proyecto) > Modules (Módulos) > Dependencies (Dependencias) > ‘+’ sign (signo "+") > JARs or Directories (Archivos JAR o directorios).

    IntelliJ JARs

    Para evitar conflictos, se recomienda quitar cualquier otra instalación de Spark de la ruta de clase. Si no es posible, asegúrese de que los archivos JAR que agregue estén al inicio de la ruta de clase. En concreto, deben estar por delante de cualquier otra versión instalada de Spark (de lo contrario, utilizará una de esas otras versiones de Spark y la ejecutará localmente o generará un error ClassDefNotFoundError).

  3. Compruebe la configuración de la opción de interrupción de IntelliJ. El valor predeterminado es All (Todos) y generará tiempos de espera de red si establece puntos de interrupción para la depuración. Establezca este valor en Thread (Subproceso) para evitar detener los subprocesos de red en segundo plano.

    IntelliJ Thread

PyDev con Eclipse

Nota:

Antes de empezar a usar Databricks Connect, debe cumplir con los requisitos y configurar el cliente de Databricks Connect.

Para usar Databricks Connect y PyDev con Eclipse, siga estas instrucciones.

  1. Inicie Eclipse.
  2. Cree un proyecto: haga clic en Archivo > Nuevo > Proyecto > PyDev > Proyecto de PyDev y , a continuación, haga clic en Siguiente.
  3. Especifique un nombre de proyecto.
  4. En Contenido del proyecto, especifique la ruta de acceso al entorno virtual de Python.
  5. Haga clic en Configurar un intérprete antes de continuar.
  6. Haga clic en Configuración manual.
  7. Haga clic en Nuevo > Buscar python/pypy exe.
  8. Navegue hasta la ruta de acceso completa al intérprete de Python al que se hace referencia en el entorno virtual y selecciónela. A continuación, haga clic en Abrir.
  9. En el diálogo Seleccionar intérprete, haga clic en Aceptar.
  10. En el diálogo Selección necesaria, haga clic en Aceptar.
  11. En el diálogo Preferencias, haga clic en Aplicar y cerrar.
  12. En el diálogo Proyecto de PyDev, haga clic en Finalizar.
  13. Haga clic en Abrir perspectiva.
  14. Agregue al proyecto un archivo de código de Python (.py) que contenga el código de ejemplo o su propio código. Si usa su propio código, como mínimo debe crear una instancia de SparkSession.builder.getOrCreate(), como se muestra en el código de ejemplo.
  15. Con el archivo de código de Python abierto, establezca los puntos de interrupción en los que quiera que se detenga el código cuando se ejecuta.
  16. Haga clic en Run > Run o en Run > Debug.

Para obtener instrucciones de ejecución y depuración más específicas, consulte Ejecución de un programa.

Eclipse

Nota:

Antes de empezar a usar Databricks Connect, debe cumplir con los requisitos y configurar el cliente de Databricks Connect.

Para usar Databricks Connect y Eclipse, haga lo siguiente:

  1. Ejecute databricks-connect get-jar-dir.

  2. Apunte la configuración de los archivos JAR externos al directorio que el comando devolvió. Vaya a Project menu (Menú del proyecto) > Properties (Propiedades) > Java Build Path (Ruta de acceso de compilación de Java) > Libraries (Bibliotecas) > Add External Jars (Agregar JAR externos).

    Configuración externa Eclipse JAR

    Para evitar conflictos, se recomienda quitar cualquier otra instalación de Spark de la ruta de clase. Si no es posible, asegúrese de que los archivos JAR que agregue estén al inicio de la ruta de clase. En concreto, deben estar por delante de cualquier otra versión instalada de Spark (de lo contrario, utilizará una de esas otras versiones de Spark y la ejecutará localmente o generará un error ClassDefNotFoundError).

    Configuración de Eclipse Spark

SBT

Nota:

Antes de empezar a usar Databricks Connect, debe cumplir con los requisitos y configurar el cliente de Databricks Connect.

Para usar Databricks Connect con SBT, debe configurar el archivo build.sbt para que se vincule con los JAR de Databricks Connect en lugar de la dependencia de la biblioteca de Spark habitual. Esto puede hacerlo con la directiva unmanagedBase con el archivo de compilación de ejemplo siguiente, el que presupone una aplicación de Scala que tiene un objeto principal com.example.Test:

build.sbt

name := "hello-world"
version := "1.0"
scalaVersion := "2.11.6"
// this should be set to the path returned by ``databricks-connect get-jar-dir``
unmanagedBase := new java.io.File("/usr/local/lib/python2.7/dist-packages/pyspark/jars")
mainClass := Some("com.example.Test")

Shell de Spark

Nota:

Antes de empezar a usar Databricks Connect, debe cumplir con los requisitos y configurar el cliente de Databricks Connect.

Si quiere usar Databricks Connect con el shell de Spark y Python o Scala, siga estas instrucciones.

  1. Con el entorno virtual activado, asegúrese de que el databricks-connect test comando se ejecutó correctamente en Configurar el cliente.

  2. Con el entorno virtual activado, inicie el shell de Spark. Para Python, ejecute el comando pyspark. En Scala, ejecute el comando spark-shell.

    # For Python:
    pyspark
    
    # For Scala:
    spark-shell
    
  3. Aparece el shell de Spark, por ejemplo para Python:

    Python 3... (v3...)
    [Clang 6... (clang-6...)] on darwin
    Type "help", "copyright", "credits" or "license" for more information.
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    ../../.. ..:..:.. WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Welcome to
           ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /__ / .__/\_,_/_/ /_/\_\   version 3....
          /_/
    
    Using Python version 3... (v3...)
    Spark context Web UI available at http://...:...
    Spark context available as 'sc' (master = local[*], app id = local-...).
    SparkSession available as 'spark'.
    >>>
    

    Para Scala:

    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    ../../.. ..:..:.. WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Spark context Web UI available at http://...
    Spark context available as 'sc' (master = local[*], app id = local-...).
    Spark session available as 'spark'.
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_\   version 3...
          /_/
    
    Using Scala version 2... (OpenJDK 64-Bit Server VM, Java 1.8...)
    Type in expressions to have them evaluated.
    Type :help for more information.
    
    scala>
    
  4. Consulte Análisis interactivo con el shell de Spark para obtener información sobre cómo usar el shell de Spark con Python o Scala para ejecutar comandos en el clúster.

    Use la variable integrada spark para representar el elemento SparkSession del clúster en ejecución, por ejemplo para Python:

    >>> df = spark.read.table("samples.nyctaxi.trips")
    >>> df.show(5)
    +--------------------+---------------------+-------------+-----------+----------+-----------+
    |tpep_pickup_datetime|tpep_dropoff_datetime|trip_distance|fare_amount|pickup_zip|dropoff_zip|
    +--------------------+---------------------+-------------+-----------+----------+-----------+
    | 2016-02-14 16:52:13|  2016-02-14 17:16:04|         4.94|       19.0|     10282|      10171|
    | 2016-02-04 18:44:19|  2016-02-04 18:46:00|         0.28|        3.5|     10110|      10110|
    | 2016-02-17 17:13:57|  2016-02-17 17:17:55|          0.7|        5.0|     10103|      10023|
    | 2016-02-18 10:36:07|  2016-02-18 10:41:45|          0.8|        6.0|     10022|      10017|
    | 2016-02-22 14:14:41|  2016-02-22 14:31:52|         4.51|       17.0|     10110|      10282|
    +--------------------+---------------------+-------------+-----------+----------+-----------+
    only showing top 5 rows
    

    Para Scala:

    >>> val df = spark.read.table("samples.nyctaxi.trips")
    >>> df.show(5)
    +--------------------+---------------------+-------------+-----------+----------+-----------+
    |tpep_pickup_datetime|tpep_dropoff_datetime|trip_distance|fare_amount|pickup_zip|dropoff_zip|
    +--------------------+---------------------+-------------+-----------+----------+-----------+
    | 2016-02-14 16:52:13|  2016-02-14 17:16:04|         4.94|       19.0|     10282|      10171|
    | 2016-02-04 18:44:19|  2016-02-04 18:46:00|         0.28|        3.5|     10110|      10110|
    | 2016-02-17 17:13:57|  2016-02-17 17:17:55|          0.7|        5.0|     10103|      10023|
    | 2016-02-18 10:36:07|  2016-02-18 10:41:45|          0.8|        6.0|     10022|      10017|
    | 2016-02-22 14:14:41|  2016-02-22 14:31:52|         4.51|       17.0|     10110|      10282|
    +--------------------+---------------------+-------------+-----------+----------+-----------+
    only showing top 5 rows
    
  5. Para detener el shell de Spark, presione Ctrl + d o Ctrl + z, o ejecute el comando quit() o exit() para Python o :q:quit para Scala.

Ejemplos de código

Este ejemplo de código simple consulta la tabla especificada y, a continuación, muestra las cinco primeras filas de la tabla especificada. Para usar otra tabla, ajuste la llamada a spark.read.table.

from pyspark.sql.session import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark.read.table("samples.nyctaxi.trips")
df.show(5)

Este ejemplo de código más largo hace lo siguiente:

  1. Crea un DataFrame en memoria.
  2. Crea una tabla con el nombre zzz_demo_temps_table dentro del esquema default. Si la tabla con este nombre ya existe, primero se elimina la tabla. Para usar un esquema o tabla diferente, ajuste las llamadas a spark.sql, temps.write.saveAsTable o ambas.
  3. Guarda el contenido de DataFrame en la tabla.
  4. Ejecuta una consulta SELECT en el contenido de la tabla.
  5. Muestra el resultado de la consulta.
  6. Elimina la tabla.

Python

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from datetime import date

spark = SparkSession.builder.appName('temps-demo').getOrCreate()

# Create a Spark DataFrame consisting of high and low temperatures
# by airport code and date.
schema = StructType([
    StructField('AirportCode', StringType(), False),
    StructField('Date', DateType(), False),
    StructField('TempHighF', IntegerType(), False),
    StructField('TempLowF', IntegerType(), False)
])

data = [
    [ 'BLI', date(2021, 4, 3), 52, 43],
    [ 'BLI', date(2021, 4, 2), 50, 38],
    [ 'BLI', date(2021, 4, 1), 52, 41],
    [ 'PDX', date(2021, 4, 3), 64, 45],
    [ 'PDX', date(2021, 4, 2), 61, 41],
    [ 'PDX', date(2021, 4, 1), 66, 39],
    [ 'SEA', date(2021, 4, 3), 57, 43],
    [ 'SEA', date(2021, 4, 2), 54, 39],
    [ 'SEA', date(2021, 4, 1), 56, 41]
]

temps = spark.createDataFrame(data, schema)

# Create a table on the Databricks cluster and then fill
# the table with the DataFrame's contents.
# If the table already exists from a previous run,
# delete it first.
spark.sql('USE default')
spark.sql('DROP TABLE IF EXISTS zzz_demo_temps_table')
temps.write.saveAsTable('zzz_demo_temps_table')

# Query the table on the Databricks cluster, returning rows
# where the airport code is not BLI and the date is later
# than 2021-04-01. Group the results and order by high
# temperature in descending order.
df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " \
    "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " \
    "GROUP BY AirportCode, Date, TempHighF, TempLowF " \
    "ORDER BY TempHighF DESC")
df_temps.show()

# Results:
#
# +-----------+----------+---------+--------+
# |AirportCode|      Date|TempHighF|TempLowF|
# +-----------+----------+---------+--------+
# |        PDX|2021-04-03|       64|      45|
# |        PDX|2021-04-02|       61|      41|
# |        SEA|2021-04-03|       57|      43|
# |        SEA|2021-04-02|       54|      39|
# +-----------+----------+---------+--------+

# Clean up by deleting the table from the Databricks cluster.
spark.sql('DROP TABLE zzz_demo_temps_table')

Scala

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Date

object Demo {
  def main(args: Array[String]) {
      val spark = SparkSession.builder.master("local").getOrCreate()

      // Create a Spark DataFrame consisting of high and low temperatures
      // by airport code and date.
      val schema = StructType(Array(
        StructField("AirportCode", StringType, false),
        StructField("Date", DateType, false),
        StructField("TempHighF", IntegerType, false),
        StructField("TempLowF", IntegerType, false)
      ))

      val data = List(
        Row("BLI", Date.valueOf("2021-04-03"), 52, 43),
        Row("BLI", Date.valueOf("2021-04-02"), 50, 38),
        Row("BLI", Date.valueOf("2021-04-01"), 52, 41),
        Row("PDX", Date.valueOf("2021-04-03"), 64, 45),
        Row("PDX", Date.valueOf("2021-04-02"), 61, 41),
        Row("PDX", Date.valueOf("2021-04-01"), 66, 39),
        Row("SEA", Date.valueOf("2021-04-03"), 57, 43),
        Row("SEA", Date.valueOf("2021-04-02"), 54, 39),
        Row("SEA", Date.valueOf("2021-04-01"), 56, 41)
      )

      val rdd = spark.sparkContext.makeRDD(data)
      val temps = spark.createDataFrame(rdd, schema)

      // Create a table on the Databricks cluster and then fill
      // the table with the DataFrame's contents.
      // If the table already exists from a previous run,
      // delete it first.
      spark.sql("USE default")
      spark.sql("DROP TABLE IF EXISTS zzz_demo_temps_table")
      temps.write.saveAsTable("zzz_demo_temps_table")

      // Query the table on the Databricks cluster, returning rows
      // where the airport code is not BLI and the date is later
      // than 2021-04-01. Group the results and order by high
      // temperature in descending order.
      val df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " +
        "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
        "GROUP BY AirportCode, Date, TempHighF, TempLowF " +
        "ORDER BY TempHighF DESC")
      df_temps.show()

      // Results:
      //
      // +-----------+----------+---------+--------+
      // |AirportCode|      Date|TempHighF|TempLowF|
      // +-----------+----------+---------+--------+
      // |        PDX|2021-04-03|       64|      45|
      // |        PDX|2021-04-02|       61|      41|
      // |        SEA|2021-04-03|       57|      43|
      // |        SEA|2021-04-02|       54|      39|
      // +-----------+----------+---------+--------+

      // Clean up by deleting the table from the Databricks cluster.
      spark.sql("DROP TABLE zzz_demo_temps_table")
  }
}

Java

import java.util.ArrayList;
import java.util.List;
import java.sql.Date;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.Dataset;

public class App {
    public static void main(String[] args) throws Exception {
        SparkSession spark = SparkSession
            .builder()
            .appName("Temps Demo")
            .config("spark.master", "local")
            .getOrCreate();

        // Create a Spark DataFrame consisting of high and low temperatures
        // by airport code and date.
        StructType schema = new StructType(new StructField[] {
            new StructField("AirportCode", DataTypes.StringType, false, Metadata.empty()),
            new StructField("Date", DataTypes.DateType, false, Metadata.empty()),
            new StructField("TempHighF", DataTypes.IntegerType, false, Metadata.empty()),
            new StructField("TempLowF", DataTypes.IntegerType, false, Metadata.empty()),
        });

        List<Row> dataList = new ArrayList<Row>();
        dataList.add(RowFactory.create("BLI", Date.valueOf("2021-04-03"), 52, 43));
        dataList.add(RowFactory.create("BLI", Date.valueOf("2021-04-02"), 50, 38));
        dataList.add(RowFactory.create("BLI", Date.valueOf("2021-04-01"), 52, 41));
        dataList.add(RowFactory.create("PDX", Date.valueOf("2021-04-03"), 64, 45));
        dataList.add(RowFactory.create("PDX", Date.valueOf("2021-04-02"), 61, 41));
        dataList.add(RowFactory.create("PDX", Date.valueOf("2021-04-01"), 66, 39));
        dataList.add(RowFactory.create("SEA", Date.valueOf("2021-04-03"), 57, 43));
        dataList.add(RowFactory.create("SEA", Date.valueOf("2021-04-02"), 54, 39));
        dataList.add(RowFactory.create("SEA", Date.valueOf("2021-04-01"), 56, 41));

        Dataset<Row> temps = spark.createDataFrame(dataList, schema);

        // Create a table on the Databricks cluster and then fill
        // the table with the DataFrame's contents.
        // If the table already exists from a previous run,
        // delete it first.
        spark.sql("USE default");
        spark.sql("DROP TABLE IF EXISTS zzz_demo_temps_table");
        temps.write().saveAsTable("zzz_demo_temps_table");

        // Query the table on the Databricks cluster, returning rows
        // where the airport code is not BLI and the date is later
        // than 2021-04-01. Group the results and order by high
        // temperature in descending order.
        Dataset<Row> df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " +
            "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
            "GROUP BY AirportCode, Date, TempHighF, TempLowF " +
            "ORDER BY TempHighF DESC");
        df_temps.show();

        // Results:
        //
        // +-----------+----------+---------+--------+
        // |AirportCode|      Date|TempHighF|TempLowF|
        // +-----------+----------+---------+--------+
        // |        PDX|2021-04-03|       64|      45|
        // |        PDX|2021-04-02|       61|      41|
        // |        SEA|2021-04-03|       57|      43|
        // |        SEA|2021-04-02|       54|      39|
        // +-----------+----------+---------+--------+

        // Clean up by deleting the table from the Databricks cluster.
        spark.sql("DROP TABLE zzz_demo_temps_table");
    }
}

Uso de dependencias

Por lo general, la clase principal o el archivo de Python tendrán otros archivos y JAR de dependencia. Para agregar dichos archivos o JAR de dependencia, puede llamar a sparkContext.addJar("path-to-the-jar") o sparkContext.addPyFile("path-to-the-file"). También puede agregar archivos ZIP y archivos EGG con la interfaz addPyFile(). Cada vez que se ejecuta el código en el IDE, los archivos y JAR de dependencia se instalan en el clúster.

Python

from lib import Foo
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

sc = spark.sparkContext
#sc.setLogLevel("INFO")

print("Testing simple count")
print(spark.range(100).count())

print("Testing addPyFile isolation")
sc.addPyFile("lib.py")
print(sc.parallelize(range(10)).map(lambda i: Foo(2)).collect())

class Foo(object):
  def __init__(self, x):
    self.x = x

Python + UDF de Java

from pyspark.sql import SparkSession
from pyspark.sql.column import _to_java_column, _to_seq, Column

## In this example, udf.jar contains compiled Java / Scala UDFs:
#package com.example
#
#import org.apache.spark.sql._
#import org.apache.spark.sql.expressions._
#import org.apache.spark.sql.functions.udf
#
#object Test {
#  val plusOne: UserDefinedFunction = udf((i: Long) => i + 1)
#}

spark = SparkSession.builder \
  .config("spark.jars", "/path/to/udf.jar") \
  .getOrCreate()
sc = spark.sparkContext

def plus_one_udf(col):
  f = sc._jvm.com.example.Test.plusOne()
  return Column(f.apply(_to_seq(sc, [col], _to_java_column)))

sc._jsc.addJar("/path/to/udf.jar")
spark.range(100).withColumn("plusOne", plus_one_udf("id")).show()

Scala

package com.example

import org.apache.spark.sql.SparkSession

case class Foo(x: String)

object Test {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      ...
      .getOrCreate();
    spark.sparkContext.setLogLevel("INFO")

    println("Running simple show query...")
    spark.read.format("parquet").load("/tmp/x").show()

    println("Running simple UDF query...")
    spark.sparkContext.addJar("./target/scala-2.11/hello-world_2.11-1.0.jar")
    spark.udf.register("f", (x: Int) => x + 1)
    spark.range(10).selectExpr("f(id)").show()

    println("Running custom objects query...")
    val objs = spark.sparkContext.parallelize(Seq(Foo("bye"), Foo("hi"))).collect()
    println(objs.toSeq)
  }
}

Acceso a las utilidades de Databricks

En esta sección se describe cómo usar Databricks Connect para acceder a las utilidades de Databricks.

Puede utilizar las utilidades dbutils.fs y dbutils.secrets del módulo Referencia de utilidades de Databricks (dbutils). Los comandos admitidos son dbutils.fs.cp, dbutils.fs.head, dbutils.fs.ls, dbutils.fs.mkdirs, dbutils.fs.mv, dbutils.fs.put, dbutils.fs.rm, dbutils.secrets.get, dbutils.secrets.getBytes, dbutils.secrets.list, dbutils.secrets.listScopes. Consulte el artículo sobre la utilidad del sistema de archivos (dbutils.fs) o ejecute dbutils.fs.help() y el artículo sobre la utilidad de secretos (dbutils.secrets) o ejecute dbutils.secrets.help().

Python

from pyspark.sql import SparkSession
from pyspark.dbutils import DBUtils

spark = SparkSession.builder.getOrCreate()

dbutils = DBUtils(spark)
print(dbutils.fs.ls("dbfs:/"))
print(dbutils.secrets.listScopes())

Cuando utilice Databricks Runtime 7.3 LTS o posterior, para acceder al módulo DBUtils de manera que funcione tanto localmente como en clústeres de Azure Databricks, utilice el get_dbutils() siguiente:

def get_dbutils(spark):
  from pyspark.dbutils import DBUtils
  return DBUtils(spark)

En caso contrario, utilice el get_dbutils() siguiente:

def get_dbutils(spark):
  if spark.conf.get("spark.databricks.service.client.enabled") == "true":
    from pyspark.dbutils import DBUtils
    return DBUtils(spark)
  else:
    import IPython
    return IPython.get_ipython().user_ns["dbutils"]

Scala

val dbutils = com.databricks.service.DBUtils
println(dbutils.fs.ls("dbfs:/"))
println(dbutils.secrets.listScopes())

Copia de archivos entre sistemas de archivos locales y remotos

Puede usar dbutils.fs para copiar archivos entre los sistemas de archivos de cliente y remotos. El esquema file:/ hace referencia al sistema de archivos local en el cliente.

from pyspark.dbutils import DBUtils
dbutils = DBUtils(spark)

dbutils.fs.cp('file:/home/user/data.csv', 'dbfs:/uploads')
dbutils.fs.cp('dbfs:/output/results.csv', 'file:/home/user/downloads/')

El tamaño de archivo máximo que se puede transferir de esa manera es de 250 MB.

Habilite dbutils.secrets.get.

Debido a las restricciones de seguridad, la capacidad de llamar a dbutils.secrets.get está deshabilitada de manera predeterminada. Póngase en contacto con el soporte técnico de Azure Databricks para habilitar esta característica para el área de trabajo.

Establecimiento de las configuraciones de Hadoop

En el cliente, puede establecer configuraciones de Hadoop mediante la API spark.conf.set, que se aplica a las operaciones de DataFrame y SQL. Las configuraciones de Hadoop establecidas en sparkContext se deben establecer en la configuración del clúster o mediante un cuaderno. Esto se debe a que las configuraciones establecidas en sparkContext no están vinculadas a sesiones de usuario, sino que se aplican a todo el clúster.

Solucionar problemas

Ejecute databricks-connect test para comprobar si hay problemas de conectividad. En esta sección se describen algunos problemas comunes que pueden surgir con Databricks Connect y cómo resolverlos.

En esta sección:

Versión de Python no coincidente

Compruebe que la versión de Python que utiliza localmente tiene al menos la misma versión secundaria que la versión existente en el clúster (por ejemplo, se admite 3.9.16 respecto de 3.9.15, pero no 3.9 respecto de 3.8).

Si tiene varias versiones de Python instaladas en el entorno local, asegúrese de que Databricks Connect utiliza la correcta; para ello, establezca la variable de entorno PYSPARK_PYTHON (por ejemplo, PYSPARK_PYTHON=python3).

Servidor no habilitado

Asegúrese de que el clúster tiene el servidor de Spark habilitado con spark.databricks.service.server.enabled true. Si es así, debería ver las líneas siguientes en el registro del controlador:

../../.. ..:..:.. INFO SparkConfUtils$: Set spark config:
spark.databricks.service.server.enabled -> true
...
../../.. ..:..:.. INFO SparkContext: Loading Spark Service RPC Server
../../.. ..:..:.. INFO SparkServiceRPCServer:
Starting Spark Service RPC Server
../../.. ..:..:.. INFO Server: jetty-9...
../../.. ..:..:.. INFO AbstractConnector: Started ServerConnector@6a6c7f42
{HTTP/1.1,[http/1.1]}{0.0.0.0:15001}
../../.. ..:..:.. INFO Server: Started @5879ms

Instalaciones de PySpark en conflicto

El paquete databricks-connect entra en conflicto con PySpark. Tener ambos instalados generará errores al inicializar el contexto de Spark en Python. Estos conflictos se pueden manifestar de varias maneras, incluidos errores de "flujo dañado" o "no se encontró la clase". Si tiene instalado PySpark en el entorno de Python, asegúrese de desinstalarlo antes de instalar databricks-connect. Una vez que desinstale PySpark, no olvide volver a instalar completamente el paquete de Databricks Connect:

pip3 uninstall pyspark
pip3 uninstall databricks-connect
pip3 install --upgrade "databricks-connect==12.2.*"  # or X.Y.* to match your specific cluster version.

SPARK_HOME en conflicto

Si anteriormente utilizó Spark en la máquina, es posible que el IDE esté configurado para utilizar una de esas otras versiones de Spark en lugar de Databricks Connect Spark. Estos conflictos se pueden manifestar de varias maneras, incluidos errores de "flujo dañado" o "no se encontró la clase". Para ver qué versión de Spark se utiliza, revise el valor de la variable de entorno SPARK_HOME:

Python

import os
print(os.environ['SPARK_HOME'])

Scala

println(sys.env.get("SPARK_HOME"))

Java

System.out.println(System.getenv("SPARK_HOME"));

Solución

Si SPARK_HOME se establece en una versión de Spark que no sea la del cliente, debe anular la variable SPARK_HOME y volver a intentarlo.

Revise la configuración de la variable de entorno del IDE, el archivo .bashrc, .zshrc o .bash_profile, y cualquier otro lugar en el que pueda haber variables de entorno establecidas. Lo más probable es que tenga que salir y reiniciar el IDE para purgar el estado anterior, e incluso puede que tenga que crear un proyecto si el problema persiste.

No es necesario establecer SPARK_HOME en un valor nuevo: anularlo debería ser suficiente.

Entrada PATH faltante o en conflicto para los archivos binarios

Es posible que la ruta de acceso PATH esté configurada para que comandos como spark-shell ejecuten otro archivo binario instalado previamente en lugar del que se incluye con Databricks Connect. Esto puede generar un error de databricks-connect test. Debe asegurarse de que los archivos binarios de Databricks Connect tengan prioridad, o bien quitar los instalados previamente.

Si no puede ejecutar comandos como spark-shell, también es posible que pip3 install no haya configurado automáticamente la ruta de acceso PATH y deberá agregar el directorio bin de instalación a la ruta de acceso PATH de manera manual. Es posible utilizar Databricks Connect con los IDE aunque no esté configurado. Sin embargo, el comando databricks-connect test no funcionará.

Configuración de serialización en conflicto en el clúster

Si ve errores de "flujo dañado" al ejecutar databricks-connect test, puede deberse a configuraciones de serialización de clúster incompatibles. Por ejemplo, establecer la configuración spark.io.compression.codec puede generar este problema. Para resolver, considere la posibilidad de quitar estas configuraciones de la configuración del clúster, o bien establecer la configuración en el cliente de Databricks Connect.

No se encuentra winutils.exe en Windows

Si utiliza Databricks Connect en Windows y ve lo siguiente:

ERROR Shell: Failed to locate the winutils binary in the hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.

Siga las instrucciones para configurar la ruta de acceso de Hadoop en Windows.

La sintaxis del nombre de archivo, el nombre del directorio o la etiqueta de volumen no es correcta en Windows

Si utiliza Windows y Databricks Connect y ve lo siguiente:

The filename, directory name, or volume label syntax is incorrect.

Significa que Java o Databricks Connect se instaló en un directorio con un espacio en la ruta de acceso. Una solución alternativa es hacer la instalación en una ruta de acceso de directorio sin espacios o configurar la ruta de acceso con el formato de nombre corto.

Autenticación mediante tokens de Microsoft Entra ID

Nota:

La siguiente información se aplica solo a las versiones 7.3.5 a 12.2.x de Databricks Connect.

Actualmente, Databricks Connect para Databricks Runtime 13.3 LTS y versiones posteriores no admite tokens de Microsoft Entra ID.

Cuando usa las versiones 7.3.5 a 12.2.x de Databricks Connect, se puede autenticar mediante un token de Microsoft Entra ID en lugar de un token de acceso personal. Los tokens de Microsoft Entra ID tienen una duración limitada. Cuando el token de Microsoft Entra ID expira, se produce el error Invalid Token en Databricks Connect.

Para las versiones 7.3.5 a 12.2.x de Databricks Connect, puede proporcionar el token de Microsoft Entra ID en la aplicación de Databricks Connect en ejecución. La aplicación debe obtener el token de acceso nuevo y establecerlo en la clave de configuración de SQL spark.databricks.service.token.

Python

spark.conf.set("spark.databricks.service.token", new_aad_token)

Scala

spark.conf.set("spark.databricks.service.token", newAADToken)

Una vez que actualice el token, la aplicación puede seguir utilizando el mismo valor SparkSession y cualquier objeto y estado creado en el contexto de la sesión. Para evitar errores intermitentes, Databricks recomienda proporcionar un token nuevo antes de que el token anterior expire.

Puede extender la vigencia del token de Microsoft Entra ID para que se mantenga durante la ejecución de la aplicación. Para ello, adjunte una directiva TokenLifetimePolicy con una vigencia adecuadamente larga a la aplicación de autorización de Microsoft Entra ID que haya usado para adquirir el token de acceso.

Nota:

El acceso directo de Microsoft Entra ID utiliza dos tokens: el token de acceso de Microsoft Entra ID anteriormente descrito que configura en las versiones 7.3.5 a 12.2.x de Databricks Connect y el token de acceso directo de ADLS para el recurso específico que Databricks genera mientras procesa la solicitud. No puede extender la duración de los tokens de acceso directo de ADLS mediante directivas de duración de tokens de Microsoft Entra ID. Si envía un comando al clúster que tarde más de una hora, se producirá un error si el comando accede a un recurso de ADLS cuando pase esa hora.

Limitaciones

  • Unity Catalog.

  • Structured Streaming.

  • La ejecución de código arbitrario no forma parte de un trabajo de Spark en el clúster remoto.

  • No se admiten las API nativas de Scala, Python y R para operaciones de tabla Delta (por ejemplo, DeltaTable.forPath). Sin embargo, sí se admite SQL API (spark.sql(...)) con operaciones de Delta Lake y Spark API (por ejemplo, spark.read.load) en tablas Delta.

  • Copia.

  • Usar funciones de SQL, UDF de Python o Scala que forman parte del catálogo del servidor. Sin embargo, las UDF de Scala y Python que se introdujeron localmente funcionan.

  • Apache Zeppelin 0.7.x y versiones anteriores.

  • Conexión a clústeres con control de acceso a tablas.

  • Conexión a clústeres con aislamiento de procesos habilitado (en otras palabras, donde spark.databricks.pyspark.enableProcessIsolation se establece en true).

  • Comando SQL CLONE Delta.

  • Vistas temporales globales.

  • Koalas y pyspark.pandas.

  • CREATE TABLE table AS SELECT ... Los comandos SQL no siempre funcionan. En su lugar, use spark.sql("SELECT ...").write.saveAsTable("table").

  • El acceso directo a credenciales de Microsoft Entra ID solo se admite en clústeres estándar que ejecutan Databricks Runtime 7.3 LTS y versiones posteriores, y no es compatible con la autenticación de la entidad de servicio.

  • La siguiente Referencia de utilidades de Databricks (dbutils):