教程:使用 Streamlit 开发 Databricks 应用

本教程演示如何使用用于 Python 和 StreamlitDatabricks SQL 连接器生成 Databricks 应用。 你将了解如何开发执行以下作的应用:

  • 读取 Unity 目录 表并将其显示在 Streamlit 接口中。
  • 编辑数据并将其写回表。

步骤 1:配置特权

这些示例假定应用使用 应用授权。 应用的服务主体必须具有:

  • 对 Unity Catalog 表的 SELECT 权限
  • 对 Unity Catalog 表的 MODIFY 权限
  • 对 SQL 仓库的 CAN USE 权限

有关详细信息,请参阅 Unity 目录特权参考SQL 仓库 ACL

步骤 2:安装依赖项

创建 requirements.txt 文件并包含以下包:

databricks-sdk
databricks-sql-connector
streamlit
pandas

步骤 3:配置应用执行

创建文件 app.yaml 以定义应用在 Azure Databricks 应用中的启动方式。

command: ['streamlit', 'run', 'app.py']

步骤 4:读取 Unity 目录表

此示例代码演示如何从 Unity 目录表读取数据并使用 Streamlit 显示数据。 创建 app.py 满足以下目标的文件:

  • 使用应用服务主体身份验证。
  • 提示用户输入 SQL 仓库 HTTP 路径和 Unity 目录表名称。
  • 在指定的表上执行SELECT *查询。
  • 在 Streamlit st.dataframe中显示结果。

app.py

import pandas as pd
import streamlit as st
from databricks import sql
from databricks.sdk.core import Config
import os

cfg = Config()

# Use app service principal authentication
def get_connection(http_path):
    server_hostname = cfg.host
    if server_hostname.startswith('https://'):
        server_hostname = server_hostname.replace('https://', '')
    elif server_hostname.startswith('http://'):
        server_hostname = server_hostname.replace('http://', '')
    return sql.connect(
        server_hostname=server_hostname,
        http_path=http_path,
        credentials_provider=lambda: cfg.authenticate,
        _use_arrow_native_complex_types=False,
    )

# Read data from a Unity Catalog table and return it as a pandas DataFrame
def read_table(table_name: str, conn) -> pd.DataFrame:
    with conn.cursor() as cursor:
        cursor.execute(f"SELECT * FROM {table_name}")
        return cursor.fetchall_arrow().to_pandas()

# Use Streamlit input fields to accept user input
http_path_input = st.text_input(
    "Enter your Databricks HTTP Path:", placeholder="/sql/1.0/warehouses/xxxxxx"
)
table_name = st.text_input(
    "Specify a Unity Catalog table name:", placeholder="catalog.schema.table"
)

# Display the result in a Streamlit DataFrame
if http_path_input and table_name:
    conn = get_connection(http_path_input)
    df = read_table(table_name, conn)
    st.dataframe(df)
else:
    st.warning("Provide both the warehouse path and a table name to load data.")

步骤 5:编辑 Unity 目录表

此示例代码允许用户使用 Streamlit 的数据编辑功能读取、编辑和写入 Unity 目录表的更改。 将以下功能添加到 app.py 文件:

  • 用于 INSERT OVERWRITE 将更新的数据写回到表中。

app.py

import pandas as pd
import streamlit as st
from databricks import sql
from databricks.sdk.core import Config
import math

cfg = Config()

# Use app service principal authentication
def get_connection(http_path):
    server_hostname = cfg.host
    if server_hostname.startswith('https://'):
        server_hostname = server_hostname.replace('https://', '')
    elif server_hostname.startswith('http://'):
        server_hostname = server_hostname.replace('http://', '')
    return sql.connect(
        server_hostname=server_hostname,
        http_path=http_path,
        credentials_provider=lambda: cfg.authenticate,
        _use_arrow_native_complex_types=False,
    )

# Read data from a Unity Catalog table and return it as a pandas DataFrame
def read_table(table_name: str, conn) -> pd.DataFrame:
    with conn.cursor() as cursor:
        cursor.execute(f"SELECT * FROM {table_name}")
        return cursor.fetchall_arrow().to_pandas()

# Format values for SQL, handling NaN/None as NULL
def format_value(val):
    if val is None or (isinstance(val, float) and math.isnan(val)):
        return 'NULL'
    else:
        return repr(val)

# Use `INSERT OVERWRITE` to update existing rows and insert new ones
def insert_overwrite_table(table_name: str, df: pd.DataFrame, conn):
    progress = st.empty()
    with conn.cursor() as cursor:
        rows = list(df.itertuples(index=False))
        values = ",".join([f"({','.join(map(format_value, row))})" for row in rows])
        with progress:
            st.info("Calling Databricks SQL...")
        cursor.execute(f"INSERT OVERWRITE {table_name} VALUES {values}")
    progress.empty()
    st.success("Changes saved")

# Use Streamlit input fields to accept user input
http_path_input = st.text_input(
    "Enter your Databricks HTTP Path:", placeholder="/sql/1.0/warehouses/xxxxxx"
)
table_name = st.text_input(
    "Specify a Unity Catalog table name:", placeholder="catalog.schema.table"
)

# Display the result in a Streamlit DataFrame
if http_path_input and table_name:
    conn = get_connection(http_path_input)
    if conn:
        st.success("✅ Connected successfully!")
        original_df = read_table(table_name, conn)
        edited_df = st.data_editor(original_df, num_rows="dynamic", hide_index=True)
        df_diff = pd.concat([original_df, edited_df]).drop_duplicates(keep=False)
        if not df_diff.empty:
            st.warning(f"⚠️ You have {len(df_diff) // 2} unsaved changes")
            if st.button("Save changes"):
                insert_overwrite_table(table_name, edited_df, conn)
                st.rerun()
else:
    st.warning("Provide both the warehouse path and a table name to load data.")

后续步骤