這很重要
MySQL 連接器目前處於公開預覽階段。 請聯絡您的 Azure Databricks 帳戶團隊申請存取權限。
學習如何使用 Lakeflow Connect 將 MySQL 的資料匯入 Azure Databricks。 MySQL 連接器支援 Amazon RDS for MySQL、Aurora MySQL、Azure Database for MySQL、Google Cloud SQL for MySQL,以及在 EC2 上運行的 MySQL。
開始之前
若要建立擷取閘道和擷取管線,您必須符合下列需求:
您的工作區已啟用 Unity Catalog 功能。
您的工作區已啟用無伺服器計算。 請參閱 無伺服器運算需求。
如果您打算建立連線:您在中繼存放區上具有
CREATE CONNECTION許可權。如果您的連接器支援 UI 型管線撰寫,您可以完成此頁面上的步驟,同時建立連線和管線。 不過,如果您使用 API 型管線撰寫,您必須先在目錄總管中建立連線,才能完成此頁面上的步驟。 請參閱 連線到受管理的擷取來源。
如果您打算使用現有的連線:您有
USE CONNECTION許可權或ALL PRIVILEGES連線。您在某目標目錄中具有
USE CATALOG權限。您在現有架構上具有
USE SCHEMA、CREATE TABLE和CREATE VOLUME許可權,或在目標目錄上具有CREATE SCHEMA許可權。
建立叢集或自訂政策的無限制許可 (僅限 API)。 閘道的自訂原則必須符合下列需求:
類別:任務計算
政策組覆寫:
{ "cluster_type": { "type": "fixed", "value": "dlt" }, "num_workers": { "type": "unlimited", "defaultValue": 1, "isOptional": true }, "runtime_engine": { "type": "fixed", "value": "STANDARD", "hidden": true } }下列計算原則可讓 Azure Databricks 調整擷取閘道,以符合工作負載的需求。 最低需求是 4 核心。 然而,為了提升快照擷取效能,Databricks 建議使用較大的實例類型,擁有更多記憶體和 CPU 核心。
{ "driver_node_type_id": { "type": "fixed", "value": "Standard_E8d_v4" }, "node_type_id": { "type": "fixed", "value": "Standard_F4s" } }
如需叢集原則的詳細資訊,請參閱 選取運算原則。
要從 MySQL 匯入資料,你也必須完成 原始碼設定。
選項 1:Azure Databricks UI
系統管理員使用者可以在UI中同時建立連線和管線。 這是建立管理的資料擷取流程最簡單的方式。
在 Azure Databricks 工作區的側邊欄中,按一下 [ 數據擷取]。
在 「新增資料 」頁面,在 Databricks 連接器下方,點選 MySQL。 擷取精靈隨即開啟。
在精靈的 資料匯入閘道 頁面上,輸入一個獨一無二的名稱作為閘道名稱。
選取暫存擷取數據的目錄和架構,然後按 [ 下一步]。
在 [擷取管線] 頁面上,輸入管線的唯一名稱。
針對 目的地目錄,選取用於儲存匯入資料的目錄。
選取儲存存取源數據所需認證的 Unity 目錄連線。
如果沒有來源的現有連線,請按兩下 [建立連線 ],然後輸入您從 來源設定取得的驗證詳細數據。 您必須在中繼資料儲存庫上擁有
CREATE CONNECTION許可權。備註
使用或
sha256_password驗證的 MySQL 使用者可能會在使用測試caching_sha2_password按鈕時發生失敗。 這是已知限制。 你仍然可以繼續建立連結。按兩下 [建立管線,然後繼續。
在 原始碼 頁面,選擇要匯入的資料庫和資料表。
選擇性地變更默認歷程記錄追蹤設定。 如需相關資訊,請參閱 啟用歷程追蹤 (SCD 類型 2)。
按 [下一步]。
在 [ 目的地 ] 頁面上,選取要寫入的 Unity 目錄目錄和架構。
如果您不想使用現有的架構,請按兩下 [建立架構] 。 您必須擁有父目錄的
USE CATALOG和CREATE SCHEMA許可權。按一下 [儲存並繼續]。
(選擇性)在 [設定] 頁面上,按兩下 [建立排程]。 設定重新整理目的地數據表的頻率。
(選擇性)設定管線作業成功或失敗的電子郵件通知。
點擊儲存並執行管線。
選項二:程式化介面
在使用 Databricks 資產組合、Databricks API、Databricks SDK 或 Databricks CLI 進行擷取之前,您必須能夠存取現有的 Unity Catalog 連線。 有關操作說明,請參閱 連線到受控擷取來源。
建立預備目錄和架構
中繼目錄和架構可以與目的目錄和架構相同。 暫存目錄不能是外部目錄。
export CONNECTION_NAME="my_mysql_connection"
export TARGET_CATALOG="main"
export TARGET_SCHEMA="lakeflow_mysql_connector"
export STAGING_CATALOG=$TARGET_CATALOG
export STAGING_SCHEMA=$TARGET_SCHEMA
export DB_HOST="mysql-instance.region.rds.amazonaws.com"
export DB_PORT="3306"
export DB_USER="databricks_replication"
export DB_PASSWORD="your_secure_password"
output=$(databricks connections create --json '{
"name": "'"$CONNECTION_NAME"'",
"connection_type": "MYSQL",
"options": {
"host": "'"$DB_HOST"'",
"port": "'"$DB_PORT"'",
"user": "'"$DB_USER"'",
"password": "'"$DB_PASSWORD"'"
}
}')
export CONNECTION_ID=$(echo $output | jq -r '.connection_id')
建立閘道器與匯入流程
資料擷取閘道會從來源資料庫擷取快照和變更資料,並將其儲存在 Unity Catalog 的暫存磁碟區中。 您必須將網關作為長期管線來運行。 這可配合來源資料庫的 binlog 保留政策。
導入管線將快照和變更數據從暫存磁碟區應用到目的地串流數據表中。
Databricks 資產套件組合
此標籤頁說明如何使用 Databricks Asset Bundles 部署資料擷取管線。 套件組合可以包含作業和工作的 YAML 定義、使用 Databricks CLI 來管理,而且可以在不同的目標工作區中共用和執行 (例如開發、預備和生產環境)。 如需詳細資訊,請參閱 Databricks 資產套件組合。
使用 Databricks CLI 建立新的套件組合:
databricks bundle init將兩個新的資源檔案新增至套件組合:
- 管線定義檔 (
resources/mysql_pipeline.yml)。 - 控制數據擷取頻率的工作流程檔案 (
resources/mysql_job.yml)。
下列為
resources/mysql_pipeline.yml檔案範例:variables: # Common variables used multiple places in the DAB definition. gateway_name: default: mysql-gateway dest_catalog: default: main dest_schema: default: ingest-destination-schema resources: pipelines: gateway: name: ${var.gateway_name} gateway_definition: connection_name: <mysql-connection> gateway_storage_catalog: main gateway_storage_schema: ${var.dest_schema} gateway_storage_name: ${var.gateway_name} target: ${var.dest_schema} catalog: ${var.dest_catalog} pipeline_mysql: name: mysql-ingestion-pipeline ingestion_definition: ingestion_gateway_id: ${resources.pipelines.gateway.id} objects: # Modify this with your tables! - table: # Ingest the table mydb.customers to dest_catalog.dest_schema.customers source_schema: public source_table: customers destination_catalog: ${var.dest_catalog} destination_schema: ${var.dest_schema} - schema: # Ingest all tables in the mydb.sales schema to dest_catalog.dest_schema # The destination table name will be the same as it is on the source source_schema: sales destination_catalog: ${var.dest_catalog} destination_schema: ${var.dest_schema} target: ${var.dest_schema} catalog: ${var.dest_catalog}下列為
resources/mysql_job.yml檔案範例:resources: jobs: mysql_dab_job: name: mysql_dab_job trigger: # Run this job every day, exactly one day from the last run # See https://docs.databricks.com/api/workspace/jobs/create#trigger periodic: interval: 1 unit: DAYS email_notifications: on_failure: - <email-address> tasks: - task_key: refresh_pipeline pipeline_task: pipeline_id: ${resources.pipelines.pipeline_mysql.id}- 管線定義檔 (
使用 Databricks CLI 部署管線:
databricks bundle deploy
Notebook
更新下列筆記本中的 Configuration 儲存格,並提供來源連接、目標目錄、目標結構和數據表,以從來源提取數據。
建立閘道和資料輸入管線
CLI
若要建立閘道:
export GATEWAY_PIPELINE_NAME="mysql-gateway"
output=$(databricks pipelines create --json '{
"name": "'"$GATEWAY_PIPELINE_NAME"'",
"gateway_definition": {
"connection_id": "'"$CONNECTION_ID"'",
"gateway_storage_catalog": "'"$STAGING_CATALOG"'",
"gateway_storage_schema": "'"$STAGING_SCHEMA"'",
"gateway_storage_name": "'"$GATEWAY_PIPELINE_NAME"'"
}
}')
export GATEWAY_PIPELINE_ID=$(echo $output | jq -r '.pipeline_id')
要建立資料引入管線:
export INGESTION_PIPELINE_NAME="mysql-ingestion-pipeline"
databricks pipelines create --json '{
"name": "'"$INGESTION_PIPELINE_NAME"'",
"ingestion_definition": {
"ingestion_gateway_id": "'"$GATEWAY_PIPELINE_ID"'",
"objects": [
{"table": {
"source_schema": "public",
"source_table": "customers",
"destination_catalog": "'"$TARGET_CATALOG"'",
"destination_schema": "'"$TARGET_SCHEMA"'",
"destination_table": "customers"
}},
{"schema": {
"source_schema": "sales",
"destination_catalog": "'"$TARGET_CATALOG"'",
"destination_schema": "'"$TARGET_SCHEMA"'"
}}
]
}
}'