Sdílet prostřednictvím


Databáze dotazu používající JDBC

Azure Databricks podporuje připojení k externím databázím pomocí JDBC. Tento článek obsahuje základní syntaxi pro konfiguraci a použití těchto připojení s příklady v Pythonu, SQL a Scala.

Důležité

Konfigurace popsané v tomto článku jsou experimentální. Experimentální funkce jsou poskytovány tak, jak jsou, a Databricks je nepodporuje prostřednictvím technické podpory zákazníků. Pokud chcete získat plnou podporu federace dotazů, měli byste místo toho použít Lakehouse Federation, která uživatelům Azure Databricks umožňuje využívat syntaxi katalogu Unity a nástroje zásad správného řízení dat.

Partner Connect poskytuje optimalizované integrace pro synchronizaci dat s mnoha externími zdroji dat. Podívejte se, co je Databricks Partner Connect?

Důležité

Příklady v tomto článku nezahrnují uživatelská jména a hesla do adres URL JDBC. Databricks doporučuje používat tajné kódy k ukládání přihlašovacích údajů databáze. Příklad:

Python

username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")

Scala

val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")

Pokud chcete odkazovat na tajné kódy Databricks s SQL, musíte během využití clusteru nakonfigurovat vlastnost konfigurace Sparku.

Úplný příklad správy tajných kódů najdete v příkladu pracovního postupu tajného kódu.

Čtení dat pomocí JDBC

Musíte nakonfigurovat řadu nastavení pro čtení dat pomocí JDBC. Všimněte si, že každá databáze používá jiný formát .<jdbc-url>

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

Spark automaticky načte schéma z databázové tabulky a mapuje jeho typy zpět na typy Spark SQL.

Python

employees_table.printSchema

SQL

DESCRIBE employees_table_vw

Scala

employees_table.printSchema

Můžete spouštět dotazy na tuto tabulku JDBC:

Python

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

SQL

SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age

Scala

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

Zápis dat pomocí JDBC

Ukládání dat do tabulek pomocí JDBC používá podobné konfigurace ke čtení. Prohlédněte si následující příklad:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

Výchozí chování se pokusí vytvořit novou tabulku a vyvolá chybu, pokud tabulka s tímto názvem již existuje.

Data můžete k existující tabulce připojit pomocí následující syntaxe:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()
)

SQL

CREATE TABLE IF NOT EXISTS new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
);

INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()

Existující tabulku můžete přepsat pomocí následující syntaxe:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()
)

SQL

CREATE OR REPLACE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()

Řízení paralelismu pro dotazy JDBC

Ovladač JDBC ve výchozím nastavení dotazuje zdrojovou databázi pouze s jedním vláknem. Pokud chcete zvýšit výkon čtení, musíte zadat řadu možností, které určují, kolik souběžných dotazů Azure Databricks provádí ve vaší databázi. U malých clusterů nastavíte numPartitions možnost rovnající se počtu jader exekutoru v clusteru, aby všechny uzly dotazovávali data paralelně.

Upozorňující

Nastavení numPartitions vysoké hodnoty ve velkém clusteru může mít za následek negativní výkon vzdálené databáze, protože příliš mnoho souběžných dotazů může službu zahltit. To je obzvláště problematické pro aplikační databáze. Buďte opatrní při nastavování této hodnoty nad 50.

Poznámka:

Urychlíte dotazy výběrem sloupce s indexem vypočítaným ve zdrojové databázi .partitionColumn

Následující příklad kódu ukazuje konfiguraci paralelismu pro cluster s osmi jádry:

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  # a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  # lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  # max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  # number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>',
  partitionColumn "<partition-key>",
  lowerBound "<min-value>",
  upperBound "<max-value>",
  numPartitions 8
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  // a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  // lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  // max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  // number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()

Poznámka:

Azure Databricks podporuje všechny možnosti Apache Sparku pro konfiguraci JDBC.

Při zápisu do databází pomocí JDBC používá Apache Spark počet oddílů v paměti k řízení paralelismu. Před zápisem do správy paralelismu můžete data předělovat. Vyhněte se velkému počtu oddílů ve velkých clusterech, abyste se vyhnuli zahlcení vzdálené databáze. Následující příklad ukazuje dělení na osm oddílů před zápisem:

Python

(employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw

Scala

employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

Nasdílení dotazu do databázového stroje

Do databáze můžete odeslat celý dotaz a vrátit jenom výsledek. Parametr table identifikuje tabulku JDBC, která se má přečíst. V klauzuli dotazu FROM SQL můžete použít cokoli, co je platné.

Python

pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "(select * from employees where emp_no < 10008) as emp_alias",
  user '<username>',
  password '<password>'
)

Scala

val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

Řízení počtu načtených řádků na dotaz

Ovladače JDBC mají fetchSize parametr, který řídí počet řádků načtených najednou ze vzdálené databáze.

Nastavení Výsledek
Příliš nízká Vysoká latence kvůli velkému počtu zaokrouhlení (několik řádků vrácených na dotaz)
Příliš vysoká Chyba nedostatku paměti (příliš mnoho dat vrácených v jednom dotazu)

Optimální hodnota je závislá na úloze. Mezi důležité informace patří:

  • Kolik sloupců dotaz vrátí?
  • Jaké datové typy se vrátí?
  • Jak dlouho se vrátí řetězce v jednotlivých sloupcích?

Systémy můžou mít velmi malé výchozí nastavení a těžit z ladění. Například: Výchozí hodnota fetchSize Oracle je 10. Když ho zvýšíte na 100, sníží se celkový počet dotazů, které je potřeba provést faktorem 10. Výsledky JDBC jsou síťový provoz, takže vyhněte se velmi velkým číslům, ale optimální hodnoty můžou být v tisících pro mnoho datových sad.

fetchSize Použijte možnost, jak je znázorněno v následujícím příkladu:

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'.
  fetchSize 100
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()