Azure Databricks クラスターに対するリージョンのディザスター リカバリー

この記事では、Azure Databricks クラスターに適したディザスター リカバリー アーキテクチャと、その設計を実現する手順について説明します。

Azure Databricks のアーキテクチャ

Azure portal から Azure Databricks ワークスペースを作成すると、選択した Azure リージョン (たとえば、米国西部) に、ご利用のサブスクリプションの Azure リソースとしてマネージド アプリケーションがデプロイされます。 このアプライアンスは、ご利用のサブスクリプションで利用できる Azure Storage アカウントおよびネットワーク セキュリティ グループと共に Azure Virtual Network にデプロイされます。 この仮想ネットワークが Databricks ワークスペースに対する境界レベルのセキュリティを提供し、またネットワーク セキュリティ グループによって保護されることになります。 Databricks クラスターは、ワークスペース内でワーカーとドライバーの VM の種類、Databricks Runtime のバージョンを指定することによって作成できます。 永続化されたデータは、ストレージ アカウントで使用できます。 クラスターの作成後は、ノートブック、REST API、ODBC または JDBC のエンドポイントを特定のクラスターにアタッチすることにより、そのエンドポイントを介してジョブを実行することができます。

Databricks ワークスペース環境の管理と監視は、Databricks のコントロール プレーンで行います。 クラスターの作成をはじめとするすべての管理操作は、コントロール プレーンから開始されます。 スケジュールされたジョブなどのすべてのメタデータは、Azure Database に格納され、データベース バックアップは自動的にそれが実装されているペア リージョンに geo レプリケートされます

Databricks architecture

このアーキテクチャの利点の 1 つは、ユーザーがそのアカウントにある任意のストレージ リソースに Azure Databricks を接続できることです。 計算 (Azure Databricks) とストレージの両方を別々にスケーリングできることが主な利点となります。

リージョンのディザスター リカバリーのトポロジを作成する方法

前述したアーキテクチャの説明を見てもわかるように、Azure Databricks を使ったビッグ データ パイプラインには、Azure Storage、Azure Database などの各種データ ソースをはじめ、さまざまなコンポーネントが存在します。 Azure Databricks は、ビッグ データ パイプラインの "計算" をつかさどるコンポーネントです。 このコンポーネントは "一時性" を特徴としています。つまり、データはあくまで Azure Storage に格納されますが、不要なときには計算料金が課金されないよう、"計算" コンポーネント (Azure Databricks クラスター) を終了することができます。 ジョブの待ち時間が大きくならないよう、"計算" コンポーネント (Azure Databricks) とストレージ ソースは同じリージョン内に存在する必要があります。

リージョンのディザスター リカバリーのトポロジを独自に作成するには、次の要件を満たす必要があります。

  1. 複数の Azure Databricks ワークスペースは、別個の Azure リージョンにプロビジョニングします。 たとえば、米国東部にプライマリ Azure Databricks ワークスペースを作成します。 セカンダリ ディザスター リカバリー Azure Databricks ワークスペースは、別のリージョン (米国西部など) に作成する必要があります。 ペアになっている Azure リージョンの一覧については、「リージョン間レプリケーション」をご参照ください。 Azure Databricks リージョンについて詳しくは、「サポートされているリージョン」をご参照ください。

  2. geo 冗長ストレージを使用します。 既定では、Azure Databricks に関連付けられているデータは Azure Storage に格納され、Databricks ジョブの結果は Azure Blob Storage に格納されます。そのため、処理済みのデータは永続的であり、クラスターが終了した後も高可用性が維持されます。 クラスター ストレージとジョブ ストレージは、同じ可用性ゾーンにあります。 リージョンの利用不可から保護するために、Azure Databricks ワークスペースでは既定で geo 冗長ストレージが使用されています。 geo 冗長ストレージでは、データは Azure ペア リージョンにレプリケートされます。 Databricks では、geo 冗長ストレージの既定値を維持することが推奨されますが、代わりにローカル冗長ストレージを使用する必要がある場合は、ワークスペースの ARM テンプレートstorageAccountSkuNameStandard_LRS に設定できます。

  3. セカンダリ リージョンの作成後、ユーザー、ユーザーのフォルダー、ノートブック、クラスター構成、ジョブ構成、ライブラリ、ストレージ、初期化スクリプトを移行し、アクセスの制御を再構成する必要があります。 その他詳しい情報については、次のセクションで取り上げます。

地域的な災害

それぞれの地域で発生する災害に備えるために、別の Azure Databricks ワークスペースのセットをセカンダリ リージョンに明示的に維持する必要があります。 「ディザスター リカバリー」を参照してください。

ディザスター リカバリーに推奨されるツールは、主に Terraform (インフラ レプリケーション用) と Delta Deep Clone (データ レプリケーション用) です。

詳細な移行手順

  1. お使いのコンピューター上で Databricks のコマンド ライン インターフェイスを設定する

    このコマンド ライン インターフェイスには、Azure Databricks REST API が使いやすいようラップされています。そのためこの記事では、自動化された手順の大半でコマンド ライン インターフェイスを使用したコード例が多くなっています。

    移行手順を実施する前に、実際に作業を行う予定のデスクトップ コンピューターまたは仮想マシンに databricks-cli をインストールしてください。 詳しくは、Databricks CLI のインストールに関するページをご覧ください。

    pip install databricks-cli
    

    注意

    この記事に掲載されている python スクリプトは、実行環境として Python 2.7+ < 3.x が想定されています。

  2. 2 つのプロファイルを構成する

    1 つはプライマリ ワークスペース用に、もう 1 つはセカンダリ ワークスペース用に構成します。

    databricks configure --profile primary --token
    databricks configure --profile secondary --token
    

    以降、この記事の各手順に出現するコード ブロックでは、対応するワークスペース コマンドを使ってプロファイルを切り替えます。 コード ブロックごとに、実際に作成するプロファイルの名前で置き換えてください。

    EXPORT_PROFILE = "primary"
    IMPORT_PROFILE = "secondary"
    

    必要であれば、コマンド ラインから手動で切り替えることもできます。

    databricks workspace ls --profile primary
    databricks workspace ls --profile secondary
    
  3. Microsoft Entra ID (旧称 Azure Active Directory) のユーザーを移行する

    プライマリ ワークスペースに存在するのと同じ Microsoft Entra ID (旧称 Azure Active Directory) ユーザーをセカンダリ ワークスペースに手動で追加します。

  4. ユーザー フォルダーとノートブックを移行する

    以下の Python コードを使用して、サンドボックス化されたユーザー環境を移行します。この環境に、ユーザーごとの入れ子になったフォルダー構造やノートブックが含まれます。

    Note

    この手順でライブラリはコピーされません。この操作は、基になる API でサポートされていません。

    次の Python スクリプトをファイルにコピーして保存し、Databricks コマンド ラインから実行します。 たとえば、「 python scriptname.py 」のように入力します。

    import sys
    import os
    import subprocess
    from subprocess import call, check_output
    
    EXPORT_PROFILE = "primary"
    IMPORT_PROFILE = "secondary"
    
    # Get a list of all users
    user_list_out = check_output(["databricks", "workspace", "ls", "/Users", "--profile", EXPORT_PROFILE])
    user_list = (user_list_out.decode(encoding="utf-8")).splitlines()
    
    print (user_list)
    
    # Export sandboxed environment(folders, notebooks) for each user and import into new workspace.
    #Libraries are not included with these APIs / commands.
    
    for user in user_list:
      #print("Trying to migrate workspace for user ".decode() + user)
      print (("Trying to migrate workspace for user ") + user)
    
      subprocess.call(str("mkdir -p ") + str(user), shell = True)
      export_exit_status = call("databricks workspace export_dir /Users/" + str(user) + " ./" + str(user) + " --profile " + EXPORT_PROFILE, shell = True)
    
      if export_exit_status==0:
        print ("Export Success")
        import_exit_status = call("databricks workspace import_dir ./" + str(user) + " /Users/" + str(user) + " --profile " + IMPORT_PROFILE, shell=True)
        if import_exit_status==0:
          print ("Import Success")
        else:
          print ("Import Failure")
      else:
        print ("Export Failure")
    print ("All done")
    
  5. クラスターの構成を移行する

    ノートブックの移行後、必要に応じてクラスターの構成を新しいワークスペースに移行することができます。 この手順は databricks-cli を使ってほぼすべて自動化されています (クラスターの構成をすべて移行するのではなく、自身で選んで移行する場合を除く)。

    注意

    クラスター構成の作成エンドポイントは、あいにく存在しません。また、このスクリプトは、各クラスターを直ちに作成しようと試みます。 ご利用のサブスクリプションで使用できるコア数が不足していた場合、クラスターの作成に失敗します。 構成が正常に転送されれば、このエラーは無視してしてかまいません。

    次に示したスクリプトは、以前のクラスター ID から新しいクラスター ID へのマッピングを出力します。このマッピングは、後からジョブ (既存のクラスターを使うように構成されたジョブ) の移行に使用します。

    次の Python スクリプトをファイルにコピーして保存し、Databricks コマンド ラインから実行します。 たとえば、「 python scriptname.py 」のように入力します。

    import sys
    import os
    import subprocess
    import json
    from subprocess import call, check_output
    
    EXPORT_PROFILE = "primary"
    IMPORT_PROFILE = "secondary"
    
    # Get all clusters info from old workspace
    clusters_out = check_output(["databricks", "clusters", "list",    "--profile", EXPORT_PROFILE])
    clusters_info_list = str(clusters_out.decode(encoding="utf-8")).   splitlines()
    print("Printting Cluster info List")
    print(clusters_info_list)
    
    # Create a list of all cluster ids
    clusters_list = []
    ##for cluster_info in clusters_info_list: clusters_list.append   (cluster_info.split(None, 1)[0])
    
    for cluster_info in clusters_info_list:
       if cluster_info != '':
          clusters_list.append(cluster_info.split(None, 1)[0])
    
    # Optionally filter cluster ids out manually, so as to create only required ones in new workspace
    
    # Create a list of mandatory / optional create request elements
    cluster_req_elems = ["num_workers","autoscale","cluster_name","spark_version","spark_conf","node_type_id","driver_node_type_id","custom_tags","cluster_log_conf","spark_env_vars","autotermination_minutes","enable_elastic_disk"]
    print("Printing Cluster element List")
    print (cluster_req_elems)
    print(str(len(clusters_list)) + " clusters found in the primary site" )
    
    print ("---------------------------------------------------------")
    # Try creating all / selected clusters in new workspace with same config as in old one.
    cluster_old_new_mappings = {}
    i = 0
    for cluster in clusters_list:
       i += 1
       print("Checking cluster " + str(i) + "/" + str(len(clusters_list)) + " : " +str(cluster))
       cluster_get_out_f = check_output(["databricks", "clusters", "get", "--cluster-id", str(cluster), "--profile", EXPORT_PROFILE])
       cluster_get_out=str(cluster_get_out_f.decode(encoding="utf-8"))
       print ("Got cluster config from old workspace")
       print (cluster_get_out)
        # Remove extra content from the config, as we need to build create request with allowed elements only
       cluster_req_json = json.loads(cluster_get_out)
       cluster_json_keys = cluster_req_json.keys()
    
       #Don't migrate Job clusters
       if cluster_req_json['cluster_source'] == u'JOB' :
          print ("Skipping this cluster as it is a Job cluster : " + cluster_req_json['cluster_id'] )
          print ("---------------------------------------------------------")
          continue
    
          #cluster_req_json.pop(key, None)
          for key in cluster_json_keys:
            if key not in cluster_req_elems:
             print (cluster_req_json)
             #cluster_del_item=cluster_json_keys .keys()
             cluster_req_json.popitem(key, None)
    
       # Create the cluster, and store the mapping from old to new cluster ids
    
       #Create a temp file to store the current cluster info as JSON
       strCurrentClusterFile = "tmp_cluster_info.json"
    
       #delete the temp file if exists
       if os.path.exists(strCurrentClusterFile) :
          os.remove(strCurrentClusterFile)
    
       fClusterJSONtmp = open(strCurrentClusterFile,"w+")
       fClusterJSONtmp.write(json.dumps(cluster_req_json))
       fClusterJSONtmp.close()
    
       #cluster_create_out = check_output(["databricks", "clusters", "create", "--json", json.dumps(cluster_req_json), "--profile", IMPORT_PROFILE])
       cluster_create_out = check_output(["databricks", "clusters", "create", "--json-file", strCurrentClusterFile , "--profile", IMPORT_PROFILE])
       cluster_create_out_json = json.loads(cluster_create_out)
       cluster_old_new_mappings[cluster] = cluster_create_out_json['cluster_id']
    
       print ("Cluster create request sent to secondary site workspace successfully")
       print ("---------------------------------------------------------")
    
       #delete the temp file if exists
       if os.path.exists(strCurrentClusterFile) :
          os.remove(strCurrentClusterFile)
    
    print ("Cluster mappings: " + json.dumps(cluster_old_new_mappings))
    print ("All done")
    print ("P.S. : Please note that all the new clusters in your secondary site are being started now!")
    print ("       If you won't use those new clusters at the moment, please don't forget terminating your new clusters to avoid charges")
    
  6. ジョブの構成を移行する

    前の手順でクラスターの構成を移行した場合、必要であれば、ジョブの構成を新しいワークスペースに移行することができます。 この手順は databricks-cli を使ってすべて自動化されています (ジョブの構成をすべて移行するのではなく、自身で選んで移行する場合を除く)。

    注意

    スケジュールされたジョブの構成には、"スケジュール" 情報も含まれています。そのため既定では、移行後すぐに、構成されているタイミングで動作が開始されます。 したがって、以前のワークスペースと新しいワークスペースが重複して実行されるのを防ぐため、次のコード ブロックでは、スケジュール情報を移行中にすべて削除しています。 そのようなジョブについては、切り替えの準備が完了した後で、スケジュールを構成してください。

    ジョブの構成には、新しいクラスターまたは既存のクラスターの設定が必要です。 既存のクラスターが使用されている場合、以下のスクリプト/コードは、以前のクラスター ID を新しいクラスター ID に置き換えようと試みます。

    以下の Python スクリプトをファイルにコピーして保存してください。 old_cluster_idnew_cluster_id の値は、前の手順 (クラスターの移行) で得られた出力内容に置き換えてください。 これを databricks-cli コマンド ラインで実行します (例: python scriptname.py)。

    import sys
    import os
    import subprocess
    import json
    from subprocess import call, check_output
    
    
    EXPORT_PROFILE = "primary"
    IMPORT_PROFILE = "secondary"
    
    # Please replace the old to new cluster id mappings from cluster migration output
    cluster_old_new_mappings = {"0227-120427-tryst214": "0229-032632-paper88"}
    
    # Get all jobs info from old workspace
    try:
      jobs_out = check_output(["databricks", "jobs", "list", "--profile", EXPORT_PROFILE])
      jobs_info_list = jobs_out.splitlines()
    except:
      print("No jobs to migrate")
      sys.exit(0)
    
    # Create a list of all job ids
    jobs_list = []
    for jobs_info in jobs_info_list:
      jobs_list.append(jobs_info.split(None, 1)[0])
    
    # Optionally filter job ids out manually, so as to create only required ones in new workspace
    
    # Create each job in the new workspace based on corresponding settings in the old workspace
    
    for job in jobs_list:
      print("Trying to migrate ") + job
    
      job_get_out = check_output(["databricks", "jobs", "get", "--job-id", job, "--profile", EXPORT_PROFILE])
      print("Got job config from old workspace")
    
      job_req_json = json.loads(job_get_out)
      job_req_settings_json = job_req_json['settings']
    
      # Remove schedule information so job doesn't start before proper cutover
      job_req_settings_json.pop('schedule', None)
    
      # Replace old cluster id with new cluster id, if job configured to run against an existing cluster
      if 'existing_cluster_id' in job_req_settings_json:
        if job_req_settings_json['existing_cluster_id'] in cluster_old_new_mappings:
          job_req_settings_json['existing_cluster_id'] = cluster_old_new_mappings[job_req_settings_json['existing_cluster_id']]
        else:
          print("Mapping not available for old cluster id ") + job_req_settings_json['existing_cluster_id']
          continue
    
      call(["databricks", "jobs", "create", "--json", json.dumps(job_req_settings_json), "--profile", IMPORT_PROFILE])
      print("Sent job create request to new workspace successfully")
    
    print("All done")
    
  7. ライブラリを移行する

    現在、ライブラリをワークスペース間で直接移行する方法はありません。 そうしたライブラリは、新しいワークスペースに手動で再インストールする必要があります。 カスタム ライブラリをワークスペースにアップロードする DBFS CLILibraries CLI とを組み合わせて自動化することはできます。

  8. Azure Blob Storage と Azure Data Lake Storage のマウントを移行する

    Notebook ベースのソリューションを使って、Azure Blob Storage と Azure Data Lake Storage (Gen 2) のマウント ポイントをすべて手動で再マウントします。 プライマリ ワークスペースにはストレージ リソースがマウント済みかと思いますので、それをセカンダリ ワークスペースについても行う必要があります。 マウントのための外部 API は存在しません。

  9. クラスター初期化スクリプトを移行する

    すべてのクラスター初期化スクリプトは、DBFS CLI を使用して、以前のワークスペースから新しいワークスペースに移行できます。 まず、必要なスクリプトを dbfs:/dat abricks/init/.. からローカル デスクトップまたは仮想マシンにコピーします。 次に、それらのスクリプトを新しいワークスペースに同じパスでコピーします。

    // Primary to local
    dbfs cp -r dbfs:/databricks/init ./old-ws-init-scripts --profile primary
    
    // Local to Secondary workspace
    dbfs cp -r old-ws-init-scripts dbfs:/databricks/init --profile secondary
    
  10. アクセスの制御を手動で再構成して再適用する

    既存のプライマリ ワークスペースが Premium または Enterpriseレベル (SKU) を使用するように構成されている場合、高い確率で Access Control 機能も使用していることが考えられます。

    Access Control 機能をご利用の場合は、アクセスの制御を各種リソース (ノートブック、クラスター、ジョブ、テーブル) に手動で再適用してください。

Azure エコシステムのディザスター リカバリー

他の Azure サービスを使用している場合は、それらのサービスにもディザスター リカバリーのベスト プラクティスを必ず実装してください。 たとえば、外部の Hive metastore インスタンスを使用する場合は、Azure SQL DatabaseAzure HDInsightAzure Database for MySQL のいずれかまたはすべてのディザスター リカバリーを検討する必要があります。 ディザスター リカバリーの全般的な情報については、「Azure アプリケーションのディザスター リカバリー」を参照してください。

次の手順

詳しくは、Azure Databricks のドキュメントをご覧ください。