本教學課程示範如何使用適用於 Python 和 Streamlit的 Databricks SQL 連接器來建置 Databricks 應用程式。 您將瞭解如何開發執行下列動作的應用程式:
- 讀取 Unity 目錄 資料表,並以 Streamlit 介面顯示。
- 編輯數據,並將它寫回數據表。
步驟:
步驟 1:設定權限
這些範例假設您的應用程式使用 應用程式授權。 您的應用程式服務主體必須具有:
-
SELECTUnity Catalog 資料表上的權限 -
MODIFYUnity Catalog 資料表上的權限 -
CAN USESQL 倉儲的權限
如需詳細資訊,請參閱 Unity Catalog 許可權和可保護物件 和 SQL 倉儲存取控制列表 (ACL)。
步驟 2:安裝相依性
在您的 requirements.txt 檔案中包含下列套件:
databricks-sdk
databricks-sql-connector
streamlit
pandas
備註
pandas 只有在編輯資料表資料時,才需要 。
步驟 3:閱讀 Unity 目錄表格
此範例示範如何從 Unity 目錄數據表讀取數據,並使用 Streamlit 加以顯示。 應用程式會執行下列步驟:
- 提示使用者輸入 SQL 倉儲 HTTP 路徑和 Unity 目錄數據表名稱。
- 使用 Databricks SQL 連接器為 Python 建立快取的 SQL 連線。
- 在
SELECT *指定的數據表上執行查詢。 - 在 Streamlit
st.dataframe中顯示結果。
app.py
import streamlit as st
from databricks import sql
from databricks.sdk.core import Config
cfg = Config() # Set the DATABRICKS_HOST environment variable when running locally
@st.cache_resource # connection is cached
def get_connection(http_path):
return sql.connect(
server_hostname=cfg.host,
http_path=http_path,
credentials_provider=lambda: cfg.authenticate,
)
def read_table(table_name, conn):
with conn.cursor() as cursor:
query = f"SELECT * FROM {table_name}"
cursor.execute(query)
return cursor.fetchall_arrow().to_pandas()
http_path_input = st.text_input(
"Enter your Databricks HTTP Path:", placeholder="/sql/1.0/warehouses/xxxxxx"
)
table_name = st.text_input(
"Specify a :re[UC] table name:", placeholder="catalog.schema.table"
)
if http_path_input and table_name:
conn = get_connection(http_path_input)
df = read_table(table_name, conn)
st.dataframe(df)
備註
- 此範例會使用
st.cache_resource來快取跨會話的資料庫連線並重新執行。 - 使用 Streamlit 輸入欄位 (
st.text_input) 接受使用者輸入。
步驟 4:編輯 Unity 目錄資料表
此範例讓使用者能閱讀、編輯並回寫 Unity 目錄資料表的變更,並利用 Streamlit 的資料編輯功能。 應用程式會執行下列步驟:
- 將原始數據表讀入 Pandas DataFrame。
- 在 Streamlit 編輯器中顯示資料表(
st.data_editor)。 - 偵測原始和編輯之 DataFrame 之間的變更。
- 使用
INSERT OVERWRITE將更新的數據寫回數據表。
app.py
import pandas as pd
import streamlit as st
from databricks import sql
from databricks.sdk.core import Config
cfg = Config() # Set the DATABRICKS_HOST environment variable when running locally
@st.cache_resource
def get_connection(http_path):
return sql.connect(
server_hostname=cfg.host,
http_path=http_path,
credentials_provider=lambda: cfg.authenticate,
)
def read_table(table_name: str, conn) -> pd.DataFrame:
with conn.cursor() as cursor:
cursor.execute(f"SELECT * FROM {table_name}")
return cursor.fetchall_arrow().to_pandas()
def insert_overwrite_table(table_name: str, df: pd.DataFrame, conn):
progress = st.empty()
with conn.cursor() as cursor:
rows = list(df.itertuples(index=False))
values = ",".join([f"({','.join(map(repr, row))})" for row in rows])
with progress:
st.info("Calling Databricks SQL...")
cursor.execute(f"INSERT OVERWRITE {table_name} VALUES {values}")
progress.empty()
st.success("Changes saved")
http_path_input = st.text_input(
"Specify the HTTP Path to your Databricks SQL Warehouse:",
placeholder="/sql/1.0/warehouses/xxxxxx",
)
table_name = st.text_input(
"Specify a Catalog table name:", placeholder="catalog.schema.table"
)
if http_path_input and table_name:
conn = get_connection(http_path_input)
original_df = read_table(table_name, conn)
edited_df = st.data_editor(original_df, num_rows="dynamic", hide_index=True)
df_diff = pd.concat([original_df, edited_df]).drop_duplicates(keep=False)
if not df_diff.empty:
if st.button("Save changes"):
insert_overwrite_table(table_name, edited_df, conn)
else:
st.warning("Provide both the warehouse path and a table name to load data.")
備註
- 應用程式會藉由計算原始數據表與編輯數據表之間的差異,來判斷是否需要更新。
- 進度列會在寫入作業期間,使用
st.info和st.success提供回饋。 - 此方法會取代數據表中的所有數據列。 針對局部更新,請使用不同的寫入策略。
後續步驟
- 將應用程式部署至您的工作區。 請參閱 部署 Databricks 應用程式。
- 限制適當使用者的存取權。 請參閱 設定 Databricks 應用程式的許可權。