Databricks Connect

참고

로컬 개발의 경우 Databricks는 Databricks Labs에서 Databricks Connect가 아닌 dbx 사용을 권장합니다.

Databricks Connect를 사용하면 즐겨 사용하는 IDE(Eclipse, IntelliJ, PyCharm, RStudio, Visual Studio Code), Notebook 서버(Jupyter Notebook, Zeppelin) 및 기타 사용자 지정 애플리케이션을 Azure Databricks 클러스터에 연결할 수 있습니다.

이 문서에서는 Databricks Connect가 작동하는 방법을 설명하고, Databricks Connect를 시작하는 단계를 안내하고, Databricks Connect를 사용할 때 발생할 수 있는 문제를 해결하는 방법 및 Databricks Connect를 사용하여 실행하는 것과 Azure Databricks Notebook에서 실행하는 것 간의 차이점을 설명합니다.

개요

Databricks Connect는 Databricks Runtime용 클라이언트 라이브러리입니다. 이를 통해 Spark API를 사용하여 작업을 작성하고 로컬 Spark 세션 대신 Azure Databricks 클러스터에서 원격으로 실행할 수 있습니다.

예를 들어 Databricks Connect를 사용하여 DataFrame 명령 spark.read.format("parquet").load(...).groupBy(...).agg(...).show()을(를) 실행하면 작업의 구문 분석 및 계획이 로컬 컴퓨터에서 실행됩니다. 그런 다음 작업의 논리적 표현은 클러스터에서 실행하기 위해 Azure Databricks에서 실행되는 Spark 서버로 전송됩니다.

Databricks Connect를 사용하면 다음을 수행할 수 있습니다.

  • Python, Java, Scala 또는 R 애플리케이션에서 대규모 Spark 작업을 실행. import pyspark, import org.apache.spark 또는 require(SparkR)이(가) 가능한 곳이라면 어디서든 IDE 플러그 인을 설치하거나 Spark 제출 스크립트를 사용할 필요 없이 애플리케이션에서 직접 Spark 작업을 실행할 수 있습니다.
  • 원격 클러스터로 작업하는 경우에도 IDE에서 코드를 단계별로 실행하고 디버그.
  • 라이브러리를 개발할 때 빠르게 반복. 각 클라이언트 세션이 클러스터에서 서로 격리되므로 Databricks Connect의 Python 또는 Java 라이브러리 종속성을 변경한 후에 클러스터를 다시 시작할 필요가 없습니다.
  • 작업 손실 없이 유휴 클러스터를 종료. 클라이언트 애플리케이션은 클러스터에서 분리되므로, 일반적으로 Notebook에 정의된 모든 변수, RDD 및 DataFrame 개체 손실을 유발하는 클러스터 다시 시작 또는 업그레이드의 영향을 받지 않습니다.

참고

SQL 쿼리를 사용한 Python 개발의 경우 Databricks는 Databricks Connect 대신 Python용 Databricks SQL Connector를 사용하도록 권장합니다. Python용 Databricks SQL 커넥터는 Databricks Connect보다 설정하기 쉽습니다. 또한 Databricks Connect의 구문 분석 및 계획 작업은 로컬 컴퓨터에서 실행되는 반면, 작업은 원격 컴퓨팅 리소스에서 실행됩니다. 따라서 런타임 오류를 디버그하기가 특히 어려울 수 있습니다. Python용 Databricks SQL Connector는 SQL 쿼리를 원격 컴퓨팅 리소스에 직접 제출하고 결과를 가져옵니다.

요구 사항

  • 다음 Databricks Runtime 버전만 지원됩니다.

    • Databricks Runtime 11.3 LTS ML, Databricks Runtime 11.3 LTS
    • Databricks Runtime 10.4 LTS ML, Databricks Runtime 10.4 LTS
    • Databricks Runtime 9.1 LTS ML, Databricks Runtime 9.1 LTS
    • Databricks Runtime 7.3 LTS ML, Databricks Runtime 7.3 LTS
  • 클라이언트 Python 설치의 부 버전은 Azure Databricks 클러스터의 부 Python 버전과 동일해야 합니다. 표에는 각 Databricks Runtime과 함께 설치된 Python 버전이 표시됩니다.

    Databricks Runtime 버전 Python 버전
    11.3 LTS ML, 11.3 LTS 3.9
    10.4 LTS ML, 10.4 LTS 3.8
    9.1 LTS ML, 9.1 LTS 3.8
    7.3 LTS ML, 7.3 LTS 3.7

    예를 들어, 로컬 개발 환경에서 Conda를 사용하고 클러스터가 Python 3.7을 실행하는 경우 해당 버전을 사용하여 환경을 만들어야 합니다. 예를 들면 다음과 같습니다.

    conda create --name dbconnect python=3.7
    conda
    
  • Databricks Connect 주 및 부 패키지 버전은 항상 Databricks Runtime 버전과 일치해야 합니다. Databricks는 항상 Databricks Runtime 버전과 일치하는 Databricks Connect의 최신 패키지를 사용할 것을 권장합니다. 예를 들어 Databricks Runtime 7.3 LTS 클러스터를 사용하는 경우 databricks-connect==7.3.* 패키지를 사용합니다.

    참고

    사용 가능한 Databricks Connect 릴리스 및 유지 관리 업데이트의 목록은 Databricks Connect 릴리스 정보를 참조하세요.

  • Java Runtime Environment(JRE) 8. 클라이언트가 OpenJDK 8 JRE를 사용하여 테스트되었습니다. 클라이언트는 Java 11을 지원하지 않습니다.

참고

Windows에서 Databricks Connect가 winutils.exe을(를) 찾을 수 없다는 오류가 표시되면 Windows에서 winutils.exe를 찾을 수 없음을 참조하세요.

클라이언트 설정

참고

Databricks Connect 클라이언트 설정을 시작하기 전에 Databricks Connect에 대한 요구 사항을 충족해야 합니다.

1단계: 클라이언트 설치

  1. PySpark를 제거합니다. databricks-connect 패키지가 PySpark와 충돌하기 때문에 필요한 작업입니다. 자세한 내용은 PySpark 설치 충돌을 참조하세요.

    pip uninstall pyspark
    
  2. Databricks Connect 클라이언트를 설치합니다.

    pip install -U "databricks-connect==7.3.*"  # or X.Y.* to match your cluster version.
    

    참고

    항상 databricks-connect=X.Y 대신 databricks-connect==X.Y.*을(를) 지정하여 최신 패키지가 설치되어 있는지 확인합니다.

2단계: 연결 속성 구성

  1. 다음 구성 속성을 수집합니다.

    • Azure Databricks 작업 영역 URL.

    • Azure Databricks 개인용 액세스 토큰 또는 Azure Active Directory 토큰

    • 만든 클러스터 노드의 ID입니다. URL에서 클러스터 ID를 가져올 수 있습니다. 클러스터 ID는 1108-201635-xxxxxxxx입니다.

      클러스터 ID

    • 작업 영역의 고유한 조직 ID입니다. 작업 영역 자산에 대한 식별자 가져오기를 참조하세요.

    • Databricks Connect가 연결할 포트입니다. 기본 포트는 15001입니다. 클러스터가 Azure Databricks에 대한 이전 지침에 제공된 8787 등의 다른 포트를 사용하도록 구성된 경우, 구성된 포트 번호를 사용합니다.

  2. 연결을 구성합니다. CLI, SQL 구성 또는 환경 변수를 사용할 수 있습니다. 구성 방법의 우선 순위는 SQL 구성 키, CLI 및 환경 변수입니다.

    • CLI
      1. databricks-connect을 실행합니다.

        databricks-connect configure
        

        라이선스에 다음이 표시됩니다.

        Copyright (2018) Databricks, Inc.
        
        This library (the "Software") may not be used except in connection with the
        Licensee's use of the Databricks Platform Services pursuant to an Agreement
          ...
        
      2. 라이선스를 수락하고 구성 값을 제공합니다. Databricks 호스트Databricks 토큰의 경우 1단계에서 기록해 둔 작업 영역 URL 및 개인용 액세스 토큰을 입력합니다.

        Do you accept the above agreement? [y/N] y
        Set new config values (leave input empty to accept default):
        Databricks Host [no current value, must start with https://]: <databricks-url>
        Databricks Token [no current value]: <databricks-token>
        Cluster ID (e.g., 0921-001415-jelly628) [no current value]: <cluster-id>
        Org ID (Azure-only, see ?o=orgId in URL) [0]: <org-id>
        Port [15001]: <port>
        

    Azure Active Directory 토큰이 너무 길다는 메시지가 표시되면 Databricks 토큰 필드를 비워 두고 ~/.databricks-connect에 수동으로 토큰을 입력할 수 있습니다.

    • SQL 구성 또는 환경 변수입니다. 다음 표에서는 1단계에서 기록해 둔 구성 속성에 해당하는 SQL 구성 키 및 환경 변수를 보여줍니다. SQL 구성 키를 설정하려면 sql("set config=value")을(를) 사용합니다. 예: sql("set spark.databricks.service.clusterId=0304-201045-abcdefgh")

      매개 변수 SQL 구성 키 환경 변수 이름
      Databricks 호스트 spark.databricks.service.address DATABRICKS_ADDRESS
      Databricks 토큰 spark.databricks.service.token DATABRICKS_API_TOKEN
      클러스터 ID spark.databricks.service.clusterId DATABRICKS_CLUSTER_ID
      조직 ID spark.databricks.service.orgId DATABRICKS_ORG_ID
      포트 spark.databricks.service.port DATABRICKS_PORT
  3. Azure Databricks에 대한 연결을 테스트합니다.

    databricks-connect test
    

    구성한 클러스터가 실행되고 있지 않으면 테스트는 클러스터를 시작하며 해당 클러스터는 구성된 자동 수정 시간까지 계속 실행됩니다. 출력은 다음과 같습니다.

    * PySpark is installed at /.../3.5.6/lib/python3.5/site-packages/pyspark
    * Checking java version
    java version "1.8.0_152"
    Java(TM) SE Runtime Environment (build 1.8.0_152-b16)
    Java HotSpot(TM) 64-Bit Server VM (build 25.152-b16, mixed mode)
    * Testing scala command
    18/12/10 16:38:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    18/12/10 16:38:50 WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set.
    18/12/10 16:39:53 WARN SparkServiceRPCClient: Now tracking server state for 5abb7c7e-df8e-4290-947c-c9a38601024e, invalidating prev state
    18/12/10 16:39:59 WARN SparkServiceRPCClient: Syncing 129 files (176036 bytes) took 3003 ms
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_\   version 2.4.0-SNAPSHOT
          /_/
    
    Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_152)
    Type in expressions to have them evaluated.
    Type :help for more information.
    
    scala> spark.range(100).reduce(_ + _)
    Spark context Web UI available at https://10.8.5.214:4040
    Spark context available as 'sc' (master = local[*], app id = local-1544488730553).
    Spark session available as 'spark'.
    View job details at <databricks-url>/?o=0#/setting/clusters/<cluster-id>/sparkUi
    View job details at <databricks-url>?o=0#/setting/clusters/<cluster-id>/sparkUi
    res0: Long = 4950
    
    scala> :quit
    
    * Testing python command
    18/12/10 16:40:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    18/12/10 16:40:17 WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set.
    18/12/10 16:40:28 WARN SparkServiceRPCClient: Now tracking server state for 5abb7c7e-df8e-4290-947c-c9a38601024e, invalidating prev state
    View job details at <databricks-url>/?o=0#/setting/clusters/<cluster-id>/sparkUi
    

IDE 또는 Notebook 서버 설정

이 섹션에서는 기본 설정 IDE 또는 Notebook 서버가 Databricks Connect 클라이언트를 사용하도록 구성하는 방법을 설명합니다.

이 섹션에서는 다음 작업을 수행합니다.

Jupyter Notebook

참고

로컬 개발의 경우 Databricks는 Databricks Labs에서 Databricks Connect가 아닌 dbx 사용을 권장합니다.

Databricks Connect 사용을 시작하기 전에 Databricks Connect에 대한 요구 사항을 충족하고 클라이언트를 설정해야 합니다.

Databricks Connect 구성 스크립트는 패키지를 자동으로 프로젝트 구성에 추가합니다. Python 커널을 시작하려면 다음을 실행합니다.

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

SQL 쿼리를 실행하고 시각화하기 위해 %sql을(를) 약식으로 사용하도록 설정하려면 다음 코드 조각을 사용합니다.

from IPython.core.magic import line_magic, line_cell_magic, Magics, magics_class

@magics_class
class DatabricksConnectMagics(Magics):

   @line_cell_magic
   def sql(self, line, cell=None):
       if cell and line:
           raise ValueError("Line must be empty for cell magic", line)
       try:
           from autovizwidget.widget.utils import display_dataframe
       except ImportError:
           print("Please run `pip install autovizwidget` to enable the visualization widget.")
           display_dataframe = lambda x: x
       return display_dataframe(self.get_spark().sql(cell or line).toPandas())

   def get_spark(self):
       user_ns = get_ipython().user_ns
       if "spark" in user_ns:
           return user_ns["spark"]
       else:
           from pyspark.sql import SparkSession
           user_ns["spark"] = SparkSession.builder.getOrCreate()
           return user_ns["spark"]

ip = get_ipython()
ip.register_magics(DatabricksConnectMagics)

PyCharm

참고

로컬 개발의 경우 Databricks는 Databricks Labs에서 Databricks Connect가 아닌 dbx 사용을 권장합니다.

Databricks Connect 사용을 시작하기 전에 Databricks Connect에 대한 요구 사항을 충족하고 클라이언트를 설정해야 합니다.

Databricks Connect 구성 스크립트는 패키지를 자동으로 프로젝트 구성에 추가합니다.

Python 3 클러스터

  1. PyCharm 프로젝트를 만들 때 기존 인터프리터를 선택합니다. 드롭다운 메뉴에서 사용자가 만든 Conda 환경을 선택합니다(요구 사항 참조).

    인터프리터 선택

  2. 실행 > 구성 편집으로 이동합니다.

  3. PYSPARK_PYTHON=python3을(를) 환경 변수로 추가합니다.

    Python 3 클러스터 구성

SparkR 및 RStudio Desktop

참고

로컬 개발의 경우 Databricks는 Databricks Labs에서 Databricks Connect가 아닌 dbx 사용을 권장합니다.

Databricks Connect 사용을 시작하기 전에 Databricks Connect에 대한 요구 사항을 충족하고 클라이언트를 설정해야 합니다.

  1. 오픈 소스 Spark를 다운로드하여 로컬 컴퓨터에 압축을 풉니다. Azure Databricks 클러스터(Hadoop 2.7)와 동일한 버전을 선택합니다.

  2. databricks-connect get-jar-dir을 실행합니다. 이 명령은 /usr/local/lib/python3.5/dist-packages/pyspark/jars과(와) 같은 경로를 반환합니다. 한 디렉터리의 파일 경로를 JAR 디렉터리 파일 경로 위에 복사합니다(예: SPARK_HOME 디렉터리인 /usr/local/lib/python3.5/dist-packages/pyspark).

  3. R 스크립트의 맨 위에 Spark lib 경로 및 Spark 홈을 추가하여 구성합니다. <spark-lib-path>을(를) 1단계에서 오픈 소스 Spark 패키지를 압축 해제한 디렉터리로 설정합니다. <spark-home-path>을(를) 2단계의 Databricks Connect 디렉터리로 설정합니다.

    # Point to the OSS package path, e.g., /path/to/.../spark-2.4.0-bin-hadoop2.7
    library(SparkR, lib.loc = .libPaths(c(file.path('<spark-lib-path>', 'R', 'lib'), .libPaths())))
    
    # Point to the Databricks Connect PySpark installation, e.g., /path/to/.../pyspark
    Sys.setenv(SPARK_HOME = "<spark-home-path>")
    
  4. Spark 세션을 시작하고 SparkR 명령 실행을 시작합니다.

    sparkR.session()
    
    df <- as.DataFrame(faithful)
    head(df)
    
    df1 <- dapply(df, function(x) { x }, schema(df))
    collect(df1)
    

sparklyr 및 RStudio Desktop

중요

이 기능은 공개 미리 보기 상태입니다.

참고

로컬 개발의 경우 Databricks는 Databricks Labs에서 Databricks Connect가 아닌 dbx 사용을 권장합니다.

Databricks Connect 사용을 시작하기 전에 Databricks Connect에 대한 요구 사항을 충족하고 클라이언트를 설정해야 합니다.

Databricks Connect를 사용하여 로컬로 개발한 sparklyr 종속 코드를 복사하여 코드 변경을 최소화하거나 변경하지 않고 Azure Databricks Notebook 또는 Azure Databricks 작업 영역의 호스트된 RStudio Server에서 실행할 수 있습니다.

이 섹션에서는 다음 작업을 수행합니다.

요구 사항

  • sparklyr 1.2 이상.
  • 일치하는 Databricks Connect가 있는 Databricks Runtime 7.3 이상.

sparklyr 설치, 구성 및 사용

  1. RStudio Desktop에서 CRAN의 sparklyr 1.2 이상을 설치하거나 GitHub의 최신 마스터 버전을 설치합니다.

    # Install from CRAN
    install.packages("sparklyr")
    
    # Or install the latest master version from GitHub
    install.packages("devtools")
    devtools::install_github("sparklyr/sparklyr")
    
  2. Databricks Connect가 설치된 Python 환경을 활성화하고 터미널에서 다음 명령을 실행하여 <spark-home-path>을(를) 가져옵니다.

    databricks-connect get-spark-home
    
  3. Spark 세션을 시작하고 sparklyr 명령 실행을 시작합니다.

    library(sparklyr)
    sc <- spark_connect(method = "databricks", spark_home = "<spark-home-path>")
    
    iris_tbl <- copy_to(sc, iris, overwrite = TRUE)
    
    library(dplyr)
    src_tbls(sc)
    
    iris_tbl %>% count
    
  4. 연결을 닫습니다.

    spark_disconnect(sc)
    

리소스

자세한 내용은 sparklyr GitHub README를 참조하세요.

코드 예제는 sparklyr를 참조하세요.

sparklyr 및 RStudio Desktop 제한 사항

다음과 같은 기능은 지원되지 않습니다.

  • sparklyr 스트리밍 API
  • sparklyr ML API
  • broom API
  • csv_file 직렬화 모드
  • spark 제출

IntelliJ(Scala 또는 Java)

참고

로컬 개발의 경우 Databricks는 Databricks Labs에서 Databricks Connect가 아닌 dbx 사용을 권장합니다.

Databricks Connect 사용을 시작하기 전에 Databricks Connect에 대한 요구 사항을 충족하고 클라이언트를 설정해야 합니다.

  1. databricks-connect get-jar-dir을 실행합니다.

  2. 종속성을 명령에서 반환된 디렉터리로 가리킵니다. 파일 > 프로젝트 구조 > 모듈 > 종속성 > '+' 기호 > JAR 또는 디렉터리로 이동합니다.

    IntelliJ JAR

    충돌을 방지하려면 클래스 경로에서 다른 Spark 설치를 제거하는 것이 좋습니다. 이 작업이 불가능한 경우, 추가하는 JAR이 클래스 경로의 맨 앞에 있는지 확인합니다. 특히, 이는 설치된 다른 Spark 버전보다 앞서야 합니다. 그렇지 않으면 다른 Spark 버전 중 하나를 사용하고 로컬로 실행하거나 ClassDefNotFoundError을(를) throw하게 됩니다.

  3. IntelliJ에서 중단 옵션의 설정을 확인합니다. 기본값은 All이며 디버깅을 위해 중단점을 설정하는 경우 네트워크 시간 제한이 발생합니다. 백그라운드 네트워크 스레드를 중지하지 않도록 하려면 Thread로 설정합니다.

    IntelliJ 스레드

Eclipse

참고

로컬 개발의 경우 Databricks는 Databricks Labs에서 Databricks Connect가 아닌 dbx 사용을 권장합니다.

Databricks Connect 사용을 시작하기 전에 Databricks Connect에 대한 요구 사항을 충족하고 클라이언트를 설정해야 합니다.

  1. databricks-connect get-jar-dir을 실행합니다.

  2. 외부 JAR 구성을 명령에서 반환된 디렉터리로 가리킵니다. 프로젝트 메뉴 > 속성 > Java 빌드 경로 > 라이브러리 > 외부 Jar로 이동합니다.

    Eclipse 외부 JAR 구성

    충돌을 방지하려면 클래스 경로에서 다른 Spark 설치를 제거하는 것이 좋습니다. 이 작업이 불가능한 경우, 추가하는 JAR이 클래스 경로의 맨 앞에 있는지 확인합니다. 특히, 이는 설치된 다른 Spark 버전보다 앞서야 합니다. 그렇지 않으면 다른 Spark 버전 중 하나를 사용하고 로컬로 실행하거나 ClassDefNotFoundError을(를) throw하게 됩니다.

    Eclipse Spark 구성

Visual Studio Code

참고

로컬 개발의 경우 Databricks는 Databricks Labs에서 Databricks Connect가 아닌 dbx 사용을 권장합니다.

Databricks Connect 사용을 시작하기 전에 Databricks Connect에 대한 요구 사항을 충족하고 클라이언트를 설정해야 합니다.

  1. Python 확장이 설치되어 있는지 확인합니다.

  2. 명령 팔레트를 엽니다(macOS의 경우 Command+Shift+P, Windows/Linux의 경우 Ctrl+Shift+P).

  3. Python 인터프리터를 선택합니다. 코드 > 기본 설정 > 설정으로 이동하고 Python 설정을 선택합니다.

  4. databricks-connect get-jar-dir을 실행합니다.

  5. 명령에서 반환된 디렉터리를 python.venvPath의 사용자 설정 JSON에 추가합니다. 이는 Python 구성에 추가해야 합니다.

  6. Linter를 사용하지 않도록 설정합니다. 오른쪽에서 ...을 클릭하고 json 설정을 편집합니다. 수정된 설정은 다음과 같습니다.

    VS Code 구성

  7. VS Code Python용으로 개발하는 데 권장되는 가상 환경으로 실행하는 경우 명령 팔레트에 select python interpreter을(를) 입력하고 클러스터 Python 버전과 일치하는 환경으로 가리킵니다.

    Python 인터프리터 선택

    예를 들어 클러스터가 Python 3.5인 경우 로컬 환경은 Python 3.5여야 합니다.

    Python 버전

SBT

참고

로컬 개발의 경우 Databricks는 Databricks Labs에서 Databricks Connect가 아닌 dbx 사용을 권장합니다.

Databricks Connect 사용을 시작하기 전에 Databricks Connect에 대한 요구 사항을 충족하고 클라이언트를 설정해야 합니다.

SBT를 사용하려면 일반적인 Spark 라이브러리 종속성 대신 Databricks Connect JAR에 연결하도록 build.sbt 파일을 구성해야 합니다. unmanagedBase 기본 개체가 있는 Scala 앱을 가정하는 다음 예제 빌드 파일의 com.example.Test 지시문을 사용하여 이 작업을 수행합니다.

build.sbt

name := "hello-world"
version := "1.0"
scalaVersion := "2.11.6"
// this should be set to the path returned by ``databricks-connect get-jar-dir``
unmanagedBase := new java.io.File("/usr/local/lib/python2.7/dist-packages/pyspark/jars")
mainClass := Some("com.example.Test")

IDE에서 예제 실행

Java

import java.util.ArrayList;
import java.util.List;
import java.sql.Date;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.Dataset;

public class App {
    public static void main(String[] args) throws Exception {
        SparkSession spark = SparkSession
            .builder()
            .appName("Temps Demo")
            .config("spark.master", "local")
            .getOrCreate();

        // Create a Spark DataFrame consisting of high and low temperatures
        // by airport code and date.
        StructType schema = new StructType(new StructField[] {
            new StructField("AirportCode", DataTypes.StringType, false, Metadata.empty()),
            new StructField("Date", DataTypes.DateType, false, Metadata.empty()),
            new StructField("TempHighF", DataTypes.IntegerType, false, Metadata.empty()),
            new StructField("TempLowF", DataTypes.IntegerType, false, Metadata.empty()),
        });

        List<Row> dataList = new ArrayList<Row>();
        dataList.add(RowFactory.create("BLI", Date.valueOf("2021-04-03"), 52, 43));
        dataList.add(RowFactory.create("BLI", Date.valueOf("2021-04-02"), 50, 38));
        dataList.add(RowFactory.create("BLI", Date.valueOf("2021-04-01"), 52, 41));
        dataList.add(RowFactory.create("PDX", Date.valueOf("2021-04-03"), 64, 45));
        dataList.add(RowFactory.create("PDX", Date.valueOf("2021-04-02"), 61, 41));
        dataList.add(RowFactory.create("PDX", Date.valueOf("2021-04-01"), 66, 39));
        dataList.add(RowFactory.create("SEA", Date.valueOf("2021-04-03"), 57, 43));
        dataList.add(RowFactory.create("SEA", Date.valueOf("2021-04-02"), 54, 39));
        dataList.add(RowFactory.create("SEA", Date.valueOf("2021-04-01"), 56, 41));

        Dataset<Row> temps = spark.createDataFrame(dataList, schema);

        // Create a table on the Databricks cluster and then fill
        // the table with the DataFrame's contents.
        // If the table already exists from a previous run,
        // delete it first.
        spark.sql("USE default");
        spark.sql("DROP TABLE IF EXISTS demo_temps_table");
        temps.write().saveAsTable("demo_temps_table");

        // Query the table on the Databricks cluster, returning rows
        // where the airport code is not BLI and the date is later
        // than 2021-04-01. Group the results and order by high
        // temperature in descending order.
        Dataset<Row> df_temps = spark.sql("SELECT * FROM demo_temps_table " +
            "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
            "GROUP BY AirportCode, Date, TempHighF, TempLowF " +
            "ORDER BY TempHighF DESC");
        df_temps.show();

        // Results:
        //
        // +-----------+----------+---------+--------+
        // |AirportCode|      Date|TempHighF|TempLowF|
        // +-----------+----------+---------+--------+
        // |        PDX|2021-04-03|       64|      45|
        // |        PDX|2021-04-02|       61|      41|
        // |        SEA|2021-04-03|       57|      43|
        // |        SEA|2021-04-02|       54|      39|
        // +-----------+----------+---------+--------+

        // Clean up by deleting the table from the Databricks cluster.
        spark.sql("DROP TABLE demo_temps_table");
    }
}

Python

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from datetime import date

spark = SparkSession.builder.appName('temps-demo').getOrCreate()

# Create a Spark DataFrame consisting of high and low temperatures
# by airport code and date.
schema = StructType([
    StructField('AirportCode', StringType(), False),
    StructField('Date', DateType(), False),
    StructField('TempHighF', IntegerType(), False),
    StructField('TempLowF', IntegerType(), False)
])

data = [
    [ 'BLI', date(2021, 4, 3), 52, 43],
    [ 'BLI', date(2021, 4, 2), 50, 38],
    [ 'BLI', date(2021, 4, 1), 52, 41],
    [ 'PDX', date(2021, 4, 3), 64, 45],
    [ 'PDX', date(2021, 4, 2), 61, 41],
    [ 'PDX', date(2021, 4, 1), 66, 39],
    [ 'SEA', date(2021, 4, 3), 57, 43],
    [ 'SEA', date(2021, 4, 2), 54, 39],
    [ 'SEA', date(2021, 4, 1), 56, 41]
]

temps = spark.createDataFrame(data, schema)

# Create a table on the Databricks cluster and then fill
# the table with the DataFrame's contents.
# If the table already exists from a previous run,
# delete it first.
spark.sql('USE default')
spark.sql('DROP TABLE IF EXISTS demo_temps_table')
temps.write.saveAsTable('demo_temps_table')

# Query the table on the Databricks cluster, returning rows
# where the airport code is not BLI and the date is later
# than 2021-04-01. Group the results and order by high
# temperature in descending order.
df_temps = spark.sql("SELECT * FROM demo_temps_table " \
    "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " \
    "GROUP BY AirportCode, Date, TempHighF, TempLowF " \
    "ORDER BY TempHighF DESC")
df_temps.show()

# Results:
#
# +-----------+----------+---------+--------+
# |AirportCode|      Date|TempHighF|TempLowF|
# +-----------+----------+---------+--------+
# |        PDX|2021-04-03|       64|      45|
# |        PDX|2021-04-02|       61|      41|
# |        SEA|2021-04-03|       57|      43|
# |        SEA|2021-04-02|       54|      39|
# +-----------+----------+---------+--------+

# Clean up by deleting the table from the Databricks cluster.
spark.sql('DROP TABLE demo_temps_table')

Scala

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Date

object Demo {
  def main(args: Array[String]) {
      val spark = SparkSession.builder.master("local").getOrCreate()

      // Create a Spark DataFrame consisting of high and low temperatures
      // by airport code and date.
      val schema = StructType(Array(
        StructField("AirportCode", StringType, false),
        StructField("Date", DateType, false),
        StructField("TempHighF", IntegerType, false),
        StructField("TempLowF", IntegerType, false)
      ))

      val data = List(
        Row("BLI", Date.valueOf("2021-04-03"), 52, 43),
        Row("BLI", Date.valueOf("2021-04-02"), 50, 38),
        Row("BLI", Date.valueOf("2021-04-01"), 52, 41),
        Row("PDX", Date.valueOf("2021-04-03"), 64, 45),
        Row("PDX", Date.valueOf("2021-04-02"), 61, 41),
        Row("PDX", Date.valueOf("2021-04-01"), 66, 39),
        Row("SEA", Date.valueOf("2021-04-03"), 57, 43),
        Row("SEA", Date.valueOf("2021-04-02"), 54, 39),
        Row("SEA", Date.valueOf("2021-04-01"), 56, 41)
      )

      val rdd = spark.sparkContext.makeRDD(data)
      val temps = spark.createDataFrame(rdd, schema)

      // Create a table on the Databricks cluster and then fill
      // the table with the DataFrame's contents.
      // If the table already exists from a previous run,
      // delete it first.
      spark.sql("USE default")
      spark.sql("DROP TABLE IF EXISTS demo_temps_table")
      temps.write.saveAsTable("demo_temps_table")

      // Query the table on the Databricks cluster, returning rows
      // where the airport code is not BLI and the date is later
      // than 2021-04-01. Group the results and order by high
      // temperature in descending order.
      val df_temps = spark.sql("SELECT * FROM demo_temps_table " +
        "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
        "GROUP BY AirportCode, Date, TempHighF, TempLowF " +
        "ORDER BY TempHighF DESC")
      df_temps.show()

      // Results:
      //
      // +-----------+----------+---------+--------+
      // |AirportCode|      Date|TempHighF|TempLowF|
      // +-----------+----------+---------+--------+
      // |        PDX|2021-04-03|       64|      45|
      // |        PDX|2021-04-02|       61|      41|
      // |        SEA|2021-04-03|       57|      43|
      // |        SEA|2021-04-02|       54|      39|
      // +-----------+----------+---------+--------+

      // Clean up by deleting the table from the Databricks cluster.
      spark.sql("DROP TABLE demo_temps_table")
  }
}

종속성이 있는 작업

일반적으로 주 클래스 또는 Python 파일에는 다른 종속성 JAR 및 파일이 있습니다. 이러한 종속성 JAR 및 파일은 sparkContext.addJar("path-to-the-jar") 또는 sparkContext.addPyFile("path-to-the-file")을(를) 호출하여 추가할 수 있습니다. addPyFile() 인터페이스를 사용하여 Egg 파일 및 zip 파일을 추가할 수도 있습니다. IDE에서 코드를 실행할 때마다 종속성 JAR 및 파일이 클러스터에 설치됩니다.

Python

from lib import Foo
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

sc = spark.sparkContext
#sc.setLogLevel("INFO")

print("Testing simple count")
print(spark.range(100).count())

print("Testing addPyFile isolation")
sc.addPyFile("lib.py")
print(sc.parallelize(range(10)).map(lambda i: Foo(2)).collect())

class Foo(object):
  def __init__(self, x):
    self.x = x

Python + Java UDF

from pyspark.sql import SparkSession
from pyspark.sql.column import _to_java_column, _to_seq, Column

## In this example, udf.jar contains compiled Java / Scala UDFs:
#package com.example
#
#import org.apache.spark.sql._
#import org.apache.spark.sql.expressions._
#import org.apache.spark.sql.functions.udf
#
#object Test {
#  val plusOne: UserDefinedFunction = udf((i: Long) => i + 1)
#}

spark = SparkSession.builder \
  .config("spark.jars", "/path/to/udf.jar") \
  .getOrCreate()
sc = spark.sparkContext

def plus_one_udf(col):
  f = sc._jvm.com.example.Test.plusOne()
  return Column(f.apply(_to_seq(sc, [col], _to_java_column)))

sc._jsc.addJar("/path/to/udf.jar")
spark.range(100).withColumn("plusOne", plus_one_udf("id")).show()

Scala

package com.example

import org.apache.spark.sql.SparkSession

case class Foo(x: String)

object Test {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      ...
      .getOrCreate();
    spark.sparkContext.setLogLevel("INFO")

    println("Running simple show query...")
    spark.read.format("parquet").load("/tmp/x").show()

    println("Running simple UDF query...")
    spark.sparkContext.addJar("./target/scala-2.11/hello-world_2.11-1.0.jar")
    spark.udf.register("f", (x: Int) => x + 1)
    spark.range(10).selectExpr("f(id)").show()

    println("Running custom objects query...")
    val objs = spark.sparkContext.parallelize(Seq(Foo("bye"), Foo("hi"))).collect()
    println(objs.toSeq)
  }
}

DBUtils에 액세스

Databricks Utilitiesdbutils.fsdbutils.secrets 유틸리티를 사용할 수 있습니다. 지원되는 명령은 dbutils.fs.cp, dbutils.fs.head, dbutils.fs.ls, dbutils.fs.mkdirs, dbutils.fs.mv, dbutils.fs.put, dbutils.fs.rm, dbutils.secrets.get, dbutils.secrets.getBytes, dbutils.secrets.list, dbutils.secrets.listScopes입니다. 파일 시스템 유틸리티(dbutils.fs)를 확인하거나 dbutils.fs.help()비밀 유틸리티(dbutils.secrets)를 실행하거나 dbutils.secrets.help()을(를) 실행합니다.

Python

from pyspark.sql import SparkSession
from pyspark.dbutils import DBUtils

spark = SparkSession.builder.getOrCreate()

dbutils = DBUtils(spark)
print(dbutils.fs.ls("dbfs:/"))
print(dbutils.secrets.listScopes())

Databricks Runtime 7.3 LTS 이상을 사용하는 경우, 로컬 및 Azure Databricks 클러스터 모두에서 작동하도록 DBUtils 모듈에 액세스하려면 다음 get_dbutils()을(를) 사용합니다.

def get_dbutils(spark):
  from pyspark.dbutils import DBUtils
  return DBUtils(spark)

또는 다음 get_dbutils()을(를) 사용합니다.

def get_dbutils(spark):
  if spark.conf.get("spark.databricks.service.client.enabled") == "true":
    from pyspark.dbutils import DBUtils
    return DBUtils(spark)
  else:
    import IPython
    return IPython.get_ipython().user_ns["dbutils"]

Scala

val dbutils = com.databricks.service.DBUtils
println(dbutils.fs.ls("dbfs:/"))
println(dbutils.secrets.listScopes())

로컬 및 원격 파일 시스템 간에 파일 복사

dbutils.fs을(를) 사용하여 클라이언트와 원격 파일 시스템 간에 파일을 복사할 수 있습니다. 스키마 file:/은(는) 클라이언트의 로컬 파일 시스템을 나타냅니다.

from pyspark.dbutils import DBUtils
dbutils = DBUtils(spark)

dbutils.fs.cp('file:/home/user/data.csv', 'dbfs:/uploads')
dbutils.fs.cp('dbfs:/output/results.csv', 'file:/home/user/downloads/')

이러한 방식으로 전송할 수 있는 최대 파일 크기는 250MB입니다.

dbutils.secrets.get를 입력합니다.

보안 제한 사항으로 인해 dbutils.secrets.get을(를) 호출하는 기능은 기본적으로 비활성화되어 있습니다. 작업 영역에서 이 기능을 사용하도록 설정하려면 Azure Databricks 지원에 문의하세요.

Hadoop 파일 시스템에 액세스

표준 Hadoop 파일 시스템 인터페이스를 사용하여 DBFS에 직접 액세스할 수도 있습니다.

> import org.apache.hadoop.fs._

// get new DBFS connection
> val dbfs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
dbfs: org.apache.hadoop.fs.FileSystem = com.databricks.backend.daemon.data.client.DBFS@2d036335

// list files
> dbfs.listStatus(new Path("dbfs:/"))
res1: Array[org.apache.hadoop.fs.FileStatus] = Array(FileStatus{path=dbfs:/$; isDirectory=true; ...})

// open file
> val stream = dbfs.open(new Path("dbfs:/path/to/your_file"))
stream: org.apache.hadoop.fs.FSDataInputStream = org.apache.hadoop.fs.FSDataInputStream@7aa4ef24

// get file contents as string
> import org.apache.commons.io._
> println(new String(IOUtils.toByteArray(stream)))

Hadoop 구성 설정

클라이언트에서 spark.conf.set API를 사용하여 Hadoop 구성을 설정할 수 있습니다. 이 구성은 SQL 및 DataFrame 작업에 적용됩니다. sparkContext에 설정된 Hadoop 구성은 클러스터 구성 또는 Notebook을 사용하여 설정해야 합니다. 이는 sparkContext에 설정된 구성이 사용자 세션에 연결되지 않고 전체 클러스터에 적용되기 때문입니다.

문제 해결

databricks-connect test을(를) 실행하여 연결 문제를 확인합니다. 이 섹션에서는 발생할 수 있는 몇 가지 일반적인 문제와 해결 방법을 설명합니다.

Python 버전 불일치

로컬로 사용 중인 Python 버전이 클러스터의 버전 이상의 부 릴리스를 가지고 있는지 확인합니다(예 3.5.13.5.2은(는) 괜찮으며 3.53.6은(는) 괜찮지 않음).

여러 Python 버전을 로컬로 설치한 경우 Databricks Connect가 PYSPARK_PYTHON 환경 변수(예: PYSPARK_PYTHON=python3)를 설정하여 올바른 버전을 사용하고 있는지 확인합니다.

서버 사용 안 함

클러스터에 Spark 서버가 spark.databricks.service.server.enabled true을(를) 통해 사용하도록 설정되어 있는지 확인합니다. 그런 경우 드라이버 로그에 다음과 같은 줄이 표시되어야 합니다.

18/10/25 21:39:18 INFO SparkConfUtils$: Set spark config:
spark.databricks.service.server.enabled -> true
...
18/10/25 21:39:21 INFO SparkContext: Loading Spark Service RPC Server
18/10/25 21:39:21 INFO SparkServiceRPCServer:
Starting Spark Service RPC Server
18/10/25 21:39:21 INFO Server: jetty-9.3.20.v20170531
18/10/25 21:39:21 INFO AbstractConnector: Started ServerConnector@6a6c7f42
{HTTP/1.1,[http/1.1]}{0.0.0.0:15001}
18/10/25 21:39:21 INFO Server: Started @5879ms

PySpark 설치 충돌

databricks-connect 패키지가 PySpark와 충돌합니다. 둘 다 설치하면 Python에서 Spark 컨텍스트를 초기화할 때 오류가 발생합니다. 이는 "스트림이 손상됨" 또는 "클래스를 찾을 수 없음" 오류를 포함하여 여러 가지 방법으로 매니페스트할 수 있습니다. Python 환경에 PySpark가 설치되어 있는 경우 databricks-connect를 설치하기 전에 PySpark가 제거되었는지 확인합니다. PySpark를 제거한 후 Databricks Connect 패키지를 완전히 다시 설치해야 합니다.

pip uninstall pyspark
pip uninstall databricks-connect
pip install -U "databricks-connect==9.1.*"  # or X.Y.* to match your cluster version.

SPARK_HOME 충돌

이전에 컴퓨터에서 Spark를 사용한 경우 Databricks Connect Spark가 아닌 다른 버전의 Spark 중 하나를 사용하도록 IDE를 구성할 수 있습니다. 이는 "스트림이 손상됨" 또는 "클래스를 찾을 수 없음" 오류를 포함하여 여러 가지 방법으로 매니페스트할 수 있습니다. SPARK_HOME 환경 변수의 값을 확인하여 사용 중인 Spark 버전을 확인할 수 있습니다.

Java

System.out.println(System.getenv("SPARK_HOME"));

Python

import os
print(os.environ['SPARK_HOME'])

Scala

println(sys.env.get("SPARK_HOME"))

해결 방법

SPARK_HOME이(가) 클라이언트의 Spark가 아닌 다른 버전의 Spark로 설정된 경우 SPARK_HOME 변수를 설정 해제하고 다시 시도해야 합니다.

IDE 환경 변수 설정, .bashrc, .zshrc 또는 .bash_profile 파일과 환경 변수를 설정할 수 있는 기타 위치를 확인합니다. 이전 상태를 제거하려면 IDE를 종료하고 다시 시작해야 할 가능성이 높으며 문제가 지속되면 새 프로젝트를 만들어야 할 수도 있습니다.

SPARK_HOME을(를) 새 값으로 설정할 필요는 없으며, 설정 해제만 하면 충분합니다.

이진 파일에 대한 PATH 항목 충돌 또는 누락

spark-shell과(와) 같은 명령이 Databricks Connect에서 제공된 이진 파일 대신 이전에 설치된 다른 이진 파일을 실행하도록 PATH가 구성되었을 가능성이 있습니다. 이 때문에 databricks-connect test이(가) 실패할 수 있습니다. Databricks Connect 이진 파일이 우선하는지 확인하거나 이전에 설치된 이진 파일을 제거해야 합니다.

spark-shell과(와) 같은 명령을 실행할 수 없는 경우 PATH가 pip install에 의해 자동으로 설정되지 않아 PATH에 설치 bin dir을 수동으로 추가해야 할 수도 있습니다. 설정되지 않은 경우에도 IDE와 함께 Databricks Connect를 사용할 수 있습니다. 그러나 databricks-connect test 명령은 작동하지 않습니다.

클러스터에서 직렬화 설정 충돌

databricks-connect test을(를) 실행할 때 "스트림이 손상되었습니다" 오류가 표시되는 경우 호환되지 않는 클러스터 직렬화 구성 때문일 수 있습니다. 예를 들어 spark.io.compression.codec 구성을 설정하면 이 문제가 발생할 수 있습니다. 이 문제를 해결하려면 클러스터 설정에서 이러한 구성을 제거하거나 Databricks Connect 클라이언트에서 구성을 설정하는 것이 좋습니다.

Windows에서 winutils.exe을(를) 찾을 수 없음

Windows에서 Databricks Connect를 사용하고 있으며 다음이 표시되는 경우:

ERROR Shell: Failed to locate the winutils binary in the hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.

지침에 따라 Windows에서 Hadoop 경로를 구성합니다.

Windows에서 파일 이름, 디렉터리 이름 또는 볼륨 레이블 구문이 잘못됨

Windows에서 Databricks Connect를 사용하고 있으며 다음이 표시되는 경우:

The filename, directory name, or volume label syntax is incorrect.

Java 또는 Databricks Connect가 경로에 공백이 포함된 디렉터리에 설치되었습니다. 이 문제는 공백 없이 디렉터리 경로에 설치하거나 짧은 이름 양식을 사용하여 경로를 구성하면 해결할 수 있습니다.

Azure Active Directory 토큰을 사용한 인증

Databricks Connect를 사용하는 경우 개인용 액세스 토큰 대신 Azure Active Directory 토큰을 사용하여 인증할 수 있습니다. Azure Active Directory 토큰의 수명은 제한되어 있습니다. Azure Active Directory 토큰이 만료되면 Databricks Connect는 Invalid Token 오류와 함께 실패합니다.

Databricks Connect 7.3.5 이상에서는 실행 중인 Databricks Connect 애플리케이션에서 Azure Active Directory 토큰을 제공할 수 있습니다. 애플리케이션은 새 액세스 토큰을 획득하고 spark.databricks.service.token SQL 구성 키로 설정해야 합니다.

Python

spark.conf.set("spark.databricks.service.token", new_aad_token)

Scala

spark.conf.set("spark.databricks.service.token", newAADToken)

토큰을 업데이트한 후 애플리케이션은 세션의 컨텍스트에서 생성된 동일한 SparkSession 및 개체와 상태를 계속 사용할 수 있습니다. 일시적인 오류를 방지하기 위해 Databricks는 이전 토큰이 만료되기 전에 새 토큰을 제공하도록 권장합니다.

애플리케이션을 실행하는 동안 Azure Active Directory 토큰이 유지되도록 수명을 연장할 수 있습니다. 이렇게 하려면 액세스 토큰을 획득하는 데 사용한 Azure Active Directory 권한 부여 애플리케이션에 수명이 충분히 긴 TokenLifetimePolicy를 연결합니다.

참고

Azure Active Directory 통과에서는 두 가지 토큰을 사용합니다. 이는 각각 이전에 Databricks Connect에서 구성한다고 설명된 Azure Active Directory 액세스 토큰, 그리고 Databricks가 요청을 처리하는 동안 Databricks가 생성하는 특정 리소스에 대한 ADLS 통과 토큰입니다. Azure Active Directory 토큰 수명 정책을 사용하여 ADLS 통과 토큰의 수명을 연장할 수 없습니다. 1시간보다 오래 걸리는 명령을 클러스터에 보내는 경우, 명령이 1시간 후 ADLS 리소스에 액세스하면 실패합니다.

제한 사항

Databricks Connect는 다음 Azure Databricks 기능 및 타사 플랫폼을 지원하지 않습니다.

  • Unity 카탈로그.

  • 구조적 스트리밍.

  • 원격 클러스터에서 Spark 작업의 일부가 아닌 임의의 코드를 실행.

  • Delta 테이블 작업에 대한 네이티브 Scala, Python 및 R API(예: DeltaTable.forPath)는 지원되지 않습니다. 그러나 Delta Lake 작업이 있는 SQL API(spark.sql(...))와 Delta 테이블의 Spark API(예: spark.read.load)는 모두 지원됩니다.

  • Copy into.

  • Apache Zeppelin 0.7.x 이하.

  • 테이블 액세스 제어를 사용하여 클러스터에 연결.

  • 프로세스 격리를 사용하도록 설정된 클러스터에 연결(즉, spark.databricks.pyspark.enableProcessIsolation이(가) true(으)로 설정된 경우).

  • Delta CLONE SQL 명령.

  • 전역 임시 보기.

  • Koalas.

  • CREATE TABLE table AS SELECT ... SQL 명령이 항상 작동하는 것은 아닙니다. 대신 spark.sql("SELECT ...").write.saveAsTable("table") 을(를) 사용하세요.

  • Azure Active Directory 자격 증명 통과는 Databricks Runtime 7.3 LTS 이상을 실행하는 표준 클러스터에서만 지원되며 서비스 주체 인증과 호환되지 않습니다.

  • 다음과 같은 Databricks Utilities: