本教程演示如何使用用于 Python 和 Streamlit 的 Databricks SQL 连接器生成 Databricks 应用。 你将了解如何开发执行以下作的应用:
- 读取 Unity 目录 表并将其显示在 Streamlit 接口中。
- 编辑数据并将其写回表。
步骤:
步骤 1:配置特权
这些示例假定应用使用 应用授权。 应用的服务主体必须具有:
- 对 Unity Catalog 表的
SELECT权限 - 对 Unity Catalog 表的
MODIFY权限 - 对 SQL 仓库的
CAN USE权限
有关详细信息,请参阅 Unity 目录特权和安全对象 以及 SQL 仓库 ACL。
步骤 2:安装依赖项
在 requirements.txt 文件中包括以下包:
databricks-sdk
databricks-sql-connector
streamlit
pandas
注释
仅当您正在编辑表格数据时,pandas 才是必需的。
步骤 3:读取 Unity 目录表
此示例演示如何从 Unity 目录表读取数据并使用 Streamlit 显示数据。 应用执行以下步骤:
- 提示用户输入 SQL 仓库 HTTP 路径和 Unity 目录表名称。
- 使用用于 Python 的 Databricks SQL 连接器建立缓存的 SQL 连接。
- 在指定的表上执行
SELECT *查询。 - 在 Streamlit
st.dataframe中显示结果。
app.py
import streamlit as st
from databricks import sql
from databricks.sdk.core import Config
cfg = Config() # Set the DATABRICKS_HOST environment variable when running locally
@st.cache_resource # connection is cached
def get_connection(http_path):
return sql.connect(
server_hostname=cfg.host,
http_path=http_path,
credentials_provider=lambda: cfg.authenticate,
)
def read_table(table_name, conn):
with conn.cursor() as cursor:
query = f"SELECT * FROM {table_name}"
cursor.execute(query)
return cursor.fetchall_arrow().to_pandas()
http_path_input = st.text_input(
"Enter your Databricks HTTP Path:", placeholder="/sql/1.0/warehouses/xxxxxx"
)
table_name = st.text_input(
"Specify a :re[UC] table name:", placeholder="catalog.schema.table"
)
if http_path_input and table_name:
conn = get_connection(http_path_input)
df = read_table(table_name, conn)
st.dataframe(df)
注释
- 此示例使用
st.cache_resource在不同会话中缓存数据库连接,并在需要时重新运行。 - 使用 Streamlit 输入字段 (
st.text_input) 接受用户输入。
步骤 4:编辑 Unity 目录表
此示例允许用户使用 Streamlit 的数据编辑功能读取、编辑和写回对 Unity 目录表的更改。 应用执行以下步骤:
- 将原始表读入 Pandas 数据帧。
- 在 Streamlit 编辑器中显示表(
st.data_editor)。 - 检测原始数据帧和已编辑的数据帧之间的更改。
- 使用
INSERT OVERWRITE将更新的数据写回到表中。
app.py
import pandas as pd
import streamlit as st
from databricks import sql
from databricks.sdk.core import Config
cfg = Config() # Set the DATABRICKS_HOST environment variable when running locally
@st.cache_resource
def get_connection(http_path):
return sql.connect(
server_hostname=cfg.host,
http_path=http_path,
credentials_provider=lambda: cfg.authenticate,
)
def read_table(table_name: str, conn) -> pd.DataFrame:
with conn.cursor() as cursor:
cursor.execute(f"SELECT * FROM {table_name}")
return cursor.fetchall_arrow().to_pandas()
def insert_overwrite_table(table_name: str, df: pd.DataFrame, conn):
progress = st.empty()
with conn.cursor() as cursor:
rows = list(df.itertuples(index=False))
values = ",".join([f"({','.join(map(repr, row))})" for row in rows])
with progress:
st.info("Calling Databricks SQL...")
cursor.execute(f"INSERT OVERWRITE {table_name} VALUES {values}")
progress.empty()
st.success("Changes saved")
http_path_input = st.text_input(
"Specify the HTTP Path to your Databricks SQL Warehouse:",
placeholder="/sql/1.0/warehouses/xxxxxx",
)
table_name = st.text_input(
"Specify a Catalog table name:", placeholder="catalog.schema.table"
)
if http_path_input and table_name:
conn = get_connection(http_path_input)
original_df = read_table(table_name, conn)
edited_df = st.data_editor(original_df, num_rows="dynamic", hide_index=True)
df_diff = pd.concat([original_df, edited_df]).drop_duplicates(keep=False)
if not df_diff.empty:
if st.button("Save changes"):
insert_overwrite_table(table_name, edited_df, conn)
else:
st.warning("Provide both the warehouse path and a table name to load data.")
注释
- 应用通过计算原始表和编辑表之间的差异来确定是否需要更新。
- 进度栏在使用
st.info和st.success进行写入操作期间提供反馈。 - 此方法替换表中的所有行。 对于部分更新,请使用其他写入策略。
后续步骤
- 将应用部署到工作区。 请参阅 部署 Databricks 应用。
- 限制对相应用户的访问权限。 请参阅 配置 Databricks 应用的权限。