Przykłady kodu dla usługi Databricks Połączenie dla języka Python
Uwaga
W tym artykule opisano Połączenie databricks dla środowiska Databricks Runtime 13.0 lub nowszego.
Ten artykuł zawiera przykłady kodu, które używają usługi Databricks Połączenie dla języka Python. Usługa Databricks Połączenie umożliwia łączenie popularnych środowisk IDE, serwerów notesów i aplikacji niestandardowych z klastrami usługi Azure Databricks. Zobacz Co to jest usługa Databricks Połączenie?. Aby zapoznać się z wersją języka Scala tego artykułu, zobacz Przykłady kodu dla usługi Databricks Połączenie dla języka Scala.
Uwaga
Przed rozpoczęciem korzystania z usługi Databricks Połączenie należy skonfigurować klienta usługi Databricks Połączenie.
Usługa Databricks udostępnia kilka dodatkowych przykładowych aplikacji, które pokazują, jak używać Połączenie usługi Databricks. Zobacz przykładowe aplikacje dla repozytorium usługi Databricks Połączenie w usłudze GitHub, w szczególności:
- Prosta aplikacja ETL
- Interaktywna aplikacja danych oparta na narzędziu Plotly
- Interaktywna aplikacja danych oparta na sztucznej inteligencji Plotly i PySpark
Możesz również użyć następujących prostszych przykładów kodu do eksperymentowania z usługą Databricks Połączenie. W tych przykładach przyjęto założenie, że używasz domyślnego uwierzytelniania dla usługi Databricks Połączenie konfiguracji klienta.
Ten prosty przykład kodu wysyła zapytanie do określonej tabeli, a następnie pokazuje pierwsze 5 wierszy określonej tabeli. Aby użyć innej tabeli, dostosuj wywołanie do spark.read.table
.
from databricks.connect import DatabricksSession
spark = DatabricksSession.builder.getOrCreate()
df = spark.read.table("samples.nyctaxi.trips")
df.show(5)
Ten dłuższy przykład kodu wykonuje następujące czynności:
- Tworzy ramkę danych w pamięci.
- Tworzy tabelę o nazwie
zzz_demo_temps_table
w schemaciedefault
. Jeśli tabela o tej nazwie już istnieje, tabela zostanie usunięta jako pierwsza. Aby użyć innego schematu lub tabeli, dostosuj wywołania dospark.sql
,temps.write.saveAsTable
lub obu. - Zapisuje zawartość ramki danych w tabeli.
SELECT
Uruchamia zapytanie dotyczące zawartości tabeli.- Pokazuje wynik zapytania.
- Usuwa tabelę.
from databricks.connect import DatabricksSession
from pyspark.sql.types import *
from datetime import date
spark = DatabricksSession.builder.getOrCreate()
# Create a Spark DataFrame consisting of high and low temperatures
# by airport code and date.
schema = StructType([
StructField('AirportCode', StringType(), False),
StructField('Date', DateType(), False),
StructField('TempHighF', IntegerType(), False),
StructField('TempLowF', IntegerType(), False)
])
data = [
[ 'BLI', date(2021, 4, 3), 52, 43],
[ 'BLI', date(2021, 4, 2), 50, 38],
[ 'BLI', date(2021, 4, 1), 52, 41],
[ 'PDX', date(2021, 4, 3), 64, 45],
[ 'PDX', date(2021, 4, 2), 61, 41],
[ 'PDX', date(2021, 4, 1), 66, 39],
[ 'SEA', date(2021, 4, 3), 57, 43],
[ 'SEA', date(2021, 4, 2), 54, 39],
[ 'SEA', date(2021, 4, 1), 56, 41]
]
temps = spark.createDataFrame(data, schema)
# Create a table on the Databricks cluster and then fill
# the table with the DataFrame's contents.
# If the table already exists from a previous run,
# delete it first.
spark.sql('USE default')
spark.sql('DROP TABLE IF EXISTS zzz_demo_temps_table')
temps.write.saveAsTable('zzz_demo_temps_table')
# Query the table on the Databricks cluster, returning rows
# where the airport code is not BLI and the date is later
# than 2021-04-01. Group the results and order by high
# temperature in descending order.
df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " \
"WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " \
"GROUP BY AirportCode, Date, TempHighF, TempLowF " \
"ORDER BY TempHighF DESC")
df_temps.show()
# Results:
#
# +-----------+----------+---------+--------+
# |AirportCode| Date|TempHighF|TempLowF|
# +-----------+----------+---------+--------+
# | PDX|2021-04-03| 64| 45|
# | PDX|2021-04-02| 61| 41|
# | SEA|2021-04-03| 57| 43|
# | SEA|2021-04-02| 54| 39|
# +-----------+----------+---------+--------+
# Clean up by deleting the table from the Databricks cluster.
spark.sql('DROP TABLE zzz_demo_temps_table')
Uwaga
W poniższym przykładzie opisano sposób pisania kodu przenośnego między usługą Databricks Połączenie dla środowiska Databricks Runtime 13.3 LTS i nowszych w środowiskach, w których DatabricksSession
klasa jest niedostępna.
W poniższym przykładzie DatabricksSession
użyto klasy lub użyto SparkSession
klasy , jeśli DatabricksSession
klasa jest niedostępna, aby wysłać zapytanie do określonej tabeli i zwrócić pierwsze 5 wierszy. W tym przykładzie użyto zmiennej środowiskowej SPARK_REMOTE
do uwierzytelniania.
from pyspark.sql import SparkSession, DataFrame
def get_spark() -> SparkSession:
try:
from databricks.connect import DatabricksSession
return DatabricksSession.builder.getOrCreate()
except ImportError:
return SparkSession.builder.getOrCreate()
def get_taxis(spark: SparkSession) -> DataFrame:
return spark.read.table("samples.nyctaxi.trips")
get_taxis(get_spark()).show(5)
Opinia
https://aka.ms/ContentUserFeedback.
Dostępne już wkrótce: W 2024 r. będziemy stopniowo wycofywać zgłoszenia z serwisu GitHub jako mechanizm przesyłania opinii na temat zawartości i zastępować go nowym systemem opinii. Aby uzyskać więcej informacji, sprawdź:Prześlij i wyświetl opinię dla