이 자습서에서는 Python 및 Streamlit용 Databricks SQL 커넥터를 사용하여 Databricks 앱을 빌드하는 방법을 보여 줍니다. 다음을 수행하는 앱을 개발하는 방법을 알아봅니다.
- Unity 카탈로그 테이블을 읽고 Streamlit 인터페이스에 표시합니다.
- 데이터를 편집하고 테이블에 다시 씁니다.
1단계: 권한 구성
이러한 예제에서는 앱이 앱 권한 부여를 사용한다고 가정합니다. 앱의 서비스 주체에는 다음이 있어야 합니다.
-
SELECTUnity 카탈로그 테이블에 대한 권한 -
MODIFYUnity 카탈로그 테이블에 대한 권한 -
CAN USESQL 웨어하우스에 대한 권한
자세한 내용은 Unity 카탈로그 권한 참조 및 SQL Warehouse ACL을 참조하세요.
2단계: 종속성 설치
requirements.txt 파일을 만들고 다음 패키지를 포함합니다.
databricks-sdk
databricks-sql-connector
streamlit
pandas
3단계: 앱 실행 구성
app.yaml Azure Databricks Apps에서 앱이 시작되는 방법을 정의하는 파일을 만듭니다.
command: ['streamlit', 'run', 'app.py']
4단계: Unity 카탈로그 테이블 읽기
이 예제 코드는 Unity 카탈로그 테이블에서 데이터를 읽고 Streamlit를 사용하여 표시하는 방법을 보여 줍니다.
app.py 다음 목표를 충족하는 파일을 만듭니다.
- 앱 서비스 주체 인증을 사용합니다.
- 사용자에게 SQL Warehouse HTTP 경로 및 Unity 카탈로그 테이블 이름을 묻는 메시지를 표시합니다.
- 지정된 테이블에서 쿼리를 실행합니다
SELECT *. - Streamlit
st.dataframe에 결과를 표시합니다.
app.py
import pandas as pd
import streamlit as st
from databricks import sql
from databricks.sdk.core import Config
import os
cfg = Config()
# Use app service principal authentication
def get_connection(http_path):
server_hostname = cfg.host
if server_hostname.startswith('https://'):
server_hostname = server_hostname.replace('https://', '')
elif server_hostname.startswith('http://'):
server_hostname = server_hostname.replace('http://', '')
return sql.connect(
server_hostname=server_hostname,
http_path=http_path,
credentials_provider=lambda: cfg.authenticate,
_use_arrow_native_complex_types=False,
)
# Read data from a Unity Catalog table and return it as a pandas DataFrame
def read_table(table_name: str, conn) -> pd.DataFrame:
with conn.cursor() as cursor:
cursor.execute(f"SELECT * FROM {table_name}")
return cursor.fetchall_arrow().to_pandas()
# Use Streamlit input fields to accept user input
http_path_input = st.text_input(
"Enter your Databricks HTTP Path:", placeholder="/sql/1.0/warehouses/xxxxxx"
)
table_name = st.text_input(
"Specify a Unity Catalog table name:", placeholder="catalog.schema.table"
)
# Display the result in a Streamlit DataFrame
if http_path_input and table_name:
conn = get_connection(http_path_input)
df = read_table(table_name, conn)
st.dataframe(df)
else:
st.warning("Provide both the warehouse path and a table name to load data.")
5단계: Unity 카탈로그 테이블 편집
이 예제 코드에서는 사용자가 Streamlit의 데이터 편집 기능을 사용하여 Unity 카탈로그 테이블에 대한 변경 내용을 읽고, 편집하고, 쓸 수 있습니다. 파일에 다음 기능을 추가합니다.app.py
- 업데이트된 데이터를 테이블에 다시 쓰는 데 사용합니다
INSERT OVERWRITE.
app.py
import pandas as pd
import streamlit as st
from databricks import sql
from databricks.sdk.core import Config
import math
cfg = Config()
# Use app service principal authentication
def get_connection(http_path):
server_hostname = cfg.host
if server_hostname.startswith('https://'):
server_hostname = server_hostname.replace('https://', '')
elif server_hostname.startswith('http://'):
server_hostname = server_hostname.replace('http://', '')
return sql.connect(
server_hostname=server_hostname,
http_path=http_path,
credentials_provider=lambda: cfg.authenticate,
_use_arrow_native_complex_types=False,
)
# Read data from a Unity Catalog table and return it as a pandas DataFrame
def read_table(table_name: str, conn) -> pd.DataFrame:
with conn.cursor() as cursor:
cursor.execute(f"SELECT * FROM {table_name}")
return cursor.fetchall_arrow().to_pandas()
# Format values for SQL, handling NaN/None as NULL
def format_value(val):
if val is None or (isinstance(val, float) and math.isnan(val)):
return 'NULL'
else:
return repr(val)
# Use `INSERT OVERWRITE` to update existing rows and insert new ones
def insert_overwrite_table(table_name: str, df: pd.DataFrame, conn):
progress = st.empty()
with conn.cursor() as cursor:
rows = list(df.itertuples(index=False))
values = ",".join([f"({','.join(map(format_value, row))})" for row in rows])
with progress:
st.info("Calling Databricks SQL...")
cursor.execute(f"INSERT OVERWRITE {table_name} VALUES {values}")
progress.empty()
st.success("Changes saved")
# Use Streamlit input fields to accept user input
http_path_input = st.text_input(
"Enter your Databricks HTTP Path:", placeholder="/sql/1.0/warehouses/xxxxxx"
)
table_name = st.text_input(
"Specify a Unity Catalog table name:", placeholder="catalog.schema.table"
)
# Display the result in a Streamlit DataFrame
if http_path_input and table_name:
conn = get_connection(http_path_input)
if conn:
st.success("✅ Connected successfully!")
original_df = read_table(table_name, conn)
edited_df = st.data_editor(original_df, num_rows="dynamic", hide_index=True)
df_diff = pd.concat([original_df, edited_df]).drop_duplicates(keep=False)
if not df_diff.empty:
st.warning(f"⚠️ You have {len(df_diff) // 2} unsaved changes")
if st.button("Save changes"):
insert_overwrite_table(table_name, edited_df, conn)
st.rerun()
else:
st.warning("Provide both the warehouse path and a table name to load data.")
다음 단계
- 작업 영역에 앱을 배포합니다. Databricks 앱 배포를 참조하세요.
- 적절한 사용자에 대한 액세스를 제한합니다. Databricks 앱에 대한 권한 구성을 참조하세요.