Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
V tomto kurzu se dozvíte, jak načíst a transformovat data pomocí rozhraní API datového rámce Apache Spark Python (PySpark), rozhraní API datového rámce Apache Spark Scala a rozhraní API SparkR SparkDataFrame v Azure Databricks.
Na konci tohoto kurzu pochopíte, co datový rámec je, a seznámíte se s následujícími úlohami:
Python
- Definování proměnných a kopírování veřejných dat do svazku katalogu Unity
- Vytvoření datového rámce pomocí Pythonu
- Načtení dat do datového rámce ze souboru CSV
- Zobrazení datového rámce a interakce s ním
- Uložení datového rámce
- Spouštění dotazů SQL v PySparku
Viz také referenční informace k rozhraní API Apache Spark PySpark.
Scala
- Definování proměnných a kopírování veřejných dat do svazku katalogu Unity
- Vytvoření datového rámce pomocí scaly
- Načtení dat do datového rámce ze souboru CSV
- Zobrazení datového rámce a interakce s ním
- Uložení datového rámce
- Spouštění dotazů SQL v Apache Sparku
Viz také referenční informace k rozhraní Apache Spark Scala API.
R
- Definování proměnných a kopírování veřejných dat do svazku katalogu Unity
- Vytvoření SparkR SparkDataFrames
- Načtení dat do datového rámce ze souboru CSV
- Zobrazení datového rámce a interakce s ním
- Uložení datového rámce
- Spouštění dotazů SQL v SparkR
Viz také referenční informace k rozhraní Apache SparkR API.
Co je datový rámec?
Datový rámec je dvourozměrná datová struktura s sloupci potenciálně různých typů. Datový rámec si můžete představit jako tabulku, tabulku SQL nebo slovník objektů řady. Datové rámce Apache Spark poskytují bohatou sadu funkcí (výběr sloupců, filtrování, spojení, agregace), které umožňují efektivně řešit běžné problémy s analýzou dat.
Datové rámce Apache Sparku jsou abstrakce založená na odolných distribuovaných datových sadách (RDD). Datové rámce Sparku a Spark SQL používají jednotný modul pro plánování a optimalizaci, který umožňuje získat téměř stejný výkon ve všech podporovaných jazycích v Azure Databricks (Python, SQL, Scala a R).
Požadavky
K dokončení následujícího kurzu musíte splnit následující požadavky:
Pokud chcete použít příklady v tomto kurzu, musí mít váš pracovní prostor povolený Unity Catalog.
Příklady v tomto kurzu používají ke ukládání ukázkových dat svazek katalogu Unity. Pokud chcete tyto příklady použít, vytvořte svazek a použijte katalog, schéma a názvy daného svazku k nastavení cesty svazku, kterou příklady používají.
V katalogu Unity musíte mít následující oprávnění:
-
READ VOLUMEaWRITE VOLUME) neboALL PRIVILEGESpro svazek použitý pro tento kurz. -
USE SCHEMAneboALL PRIVILEGESpro schéma použité v tomto kurzu. -
USE CATALOGneboALL PRIVILEGESpro katalog použitý pro tento kurz.
Pokud chcete tato oprávnění nastavit, podívejte se na správce Databricks nebo oprávnění katalogu Unity a zabezpečitelné objekty.
-
Návod
Dokončený poznámkový blok pro tento článek najdete v poznámkových blocích kurzu datového rámce.
Krok 1: Definování proměnných a načtení souboru CSV
Tento krok definuje proměnné pro použití v tomto kurzu a pak načte soubor CSV obsahující data názvu dítěte z health.data.ny.gov do svazku katalogu Unity.
Kliknutím
na ikonu otevřete nový poznámkový blok. Informace o procházení poznámkových bloků Azure Databricks najdete v tématu Přizpůsobení vzhledu poznámkového bloku.Zkopírujte a vložte následující kód do nové prázdné buňky poznámkového bloku. Nahraďte
<catalog-name>,<schema-name>a<volume-name>katalogem, schématem a názvy svazků pro svazek katalogu Unity. Nahraďte<table_name>názvem tabulky podle vašeho výběru. Data názvu dítěte načtete do této tabulky později v tomto kurzu.Python
catalog = "<catalog_name>" schema = "<schema_name>" volume = "<volume_name>" download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name = "rows.csv" table_name = "<table_name>" path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume path_table = catalog + "." + schema print(path_table) # Show the complete path print(path_volume) # Show the complete pathScala
val catalog = "<catalog_name>" val schema = "<schema_name>" val volume = "<volume_name>" val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" val fileName = "rows.csv" val tableName = "<table_name>" val pathVolume = s"/Volumes/$catalog/$schema/$volume" val pathTable = s"$catalog.$schema" print(pathVolume) // Show the complete path print(pathTable) // Show the complete pathR
catalog <- "<catalog_name>" schema <- "<schema_name>" volume <- "<volume_name>" download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name <- "rows.csv" table_name <- "<table_name>" path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "") path_table <- paste(catalog, ".", schema, sep = "") print(path_volume) # Show the complete path print(path_table) # Show the complete pathStisknutím spustíte
Shift+Enterbuňku a vytvoříte novou prázdnou buňku.Zkopírujte a vložte následující kód do nové prázdné buňky poznámkového bloku. Tento kód zkopíruje soubor
rows.csvze health.data.ny.gov do svazku katalogu Unity pomocí příkazu Databricks dbutuils.Python
dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")Scala
dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")R
dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))Stisknutím klávesy
Shift+Enterspusťte buňku a přejděte na další buňku.
Krok 2: Vytvoření datového rámce
Tento krok vytvoří datový rámec s testovacími df1 daty a pak zobrazí jeho obsah.
Zkopírujte a vložte následující kód do nové prázdné buňky poznámkového bloku. Tento kód vytvoří datový rámec s testovacími daty a pak zobrazí obsah a schéma datového rámce.
Python
data = [[2021, "test", "Albany", "M", 42]] columns = ["Year", "First_Name", "County", "Sex", "Count"] # highlight-next-line df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int") display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.Scala
val data = Seq((2021, "test", "Albany", "M", 42)) val columns = Seq("Year", "First_Name", "County", "Sex", "Count") // highlight-next-line val df1 = data.toDF(columns: _*) display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization. // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.R
# Load the SparkR package that is already preinstalled on the cluster. library(SparkR) data <- data.frame( Year = as.integer(c(2021)), First_Name = c("test"), County = c("Albany"), Sex = c("M"), Count = as.integer(c(42)) ) # highlight-next-line df1 <- createDataFrame(data) display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.Stisknutím klávesy
Shift+Enterspusťte buňku a přejděte na další buňku.
Krok 3: Načtení dat do datového rámce ze souboru CSV
Tento krok vytvoří datový rámec s názvem df_csv ze souboru CSV, který jste předtím načetli do svazku katalogu Unity. Viz spark.read.csv.
Zkopírujte a vložte následující kód do nové prázdné buňky poznámkového bloku. Tento kód načte data názvu dítěte do datového rámce
df_csvze souboru CSV a pak zobrazí obsah datového rámce.Python
df_csv = spark.read.csv(f"{path_volume}/{file_name}", header=True, inferSchema=True, sep=",") display(df_csv)Scala
val dfCsv = spark.read .option("header", "true") .option("inferSchema", "true") .option("delimiter", ",") .csv(s"$pathVolume/$fileName") display(dfCsv)R
df_csv <- read.df(paste(path_volume, "/", file_name, sep=""), source="csv", header = TRUE, inferSchema = TRUE, delimiter = ",") display(df_csv)Stisknutím klávesy
Shift+Enterspusťte buňku a přejděte na další buňku.
Můžete načíst data z mnoha podporovaných formátů souborů.
Krok 4: Zobrazení datového rámce a interakce s ním
Pomocí následujících metod můžete zobrazit datové rámce s názvy dětí a pracovat s nimi.
Tisk schématu datového rámce
Zjistěte, jak zobrazit schéma datového rámce Apache Spark. Apache Spark používá termín schéma k označení názvů a datových typů sloupců v DataFrame.
Poznámka:
Azure Databricks také používá schéma termínů k popisu kolekce tabulek zaregistrovaných v katalogu.
Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód ukazuje schéma datových rámců s metodou
.printSchema()pro zobrazení schémat dvou datových rámců – pro přípravu na sjednocení těchto dvou datových rámců.Python
df_csv.printSchema() df1.printSchema()Scala
dfCsv.printSchema() df1.printSchema()R
printSchema(df_csv) printSchema(df1)Stisknutím klávesy
Shift+Enterspusťte buňku a přejděte na další buňku.
Přejmenování sloupce v datovém rámci
Zjistěte, jak přejmenovat sloupec v datovém rámci.
Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód přejmenuje sloupec v datovém rámci
df1_csvtak, aby odpovídal příslušnému sloupci v datovém rámcidf1. Tento kód používá metodu Apache SparkwithColumnRenamed().Python
df_csv = df_csv.withColumnRenamed("First Name", "First_Name") df_csv.printSchemaScala
val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name") // when modifying a DataFrame in Scala, you must assign it to a new variable dfCsvRenamed.printSchema()R
df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name") printSchema(df_csv)Stisknutím klávesy
Shift+Enterspusťte buňku a přejděte na další buňku.
Kombinování datových rámců
Zjistěte, jak vytvořit nový datový rámec, který přidá řádky jednoho datového rámce do druhého.
Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód používá metodu Apache Spark
union()ke kombinování obsahu prvního datového rámcedfs datovým rámcemdf_csvobsahujícím data názvů dětí načtená ze souboru CSV.Python
df = df1.union(df_csv) display(df)Scala
val df = df1.union(dfCsvRenamed) display(df)R
display(df <- union(df1, df_csv))Stisknutím klávesy
Shift+Enterspusťte buňku a přejděte na další buňku.
Filtrování řádků v datovém rámci
Seznamte se s nejoblíbenějšími názvy dětí v sadě dat filtrováním řádků pomocí .filter() Apache Sparku nebo metod .where(). Pomocí filtrování vyberte podmnožinu řádků, které chcete vrátit nebo upravit v datovém rámci. V výkonu nebo syntaxi není žádný rozdíl, jak je vidět v následujících příkladech.
Použití metody .filter()
Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód používá metodu Apache Spark
.filter()k zobrazení těchto řádků v datovém rámci s počtem více než 50.Python
display(df.filter(df["Count"] > 50))Scala
display(df.filter(df("Count") > 50))R
display(filteredDF <- filter(df, df$Count > 50))Stisknutím klávesy
Shift+Enterspusťte buňku a přejděte na další buňku.
Použití metody .where()
Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód používá metodu Apache Spark
.where()k zobrazení těchto řádků v datovém rámci s počtem více než 50.Python
display(df.where(df["Count"] > 50))Scala
display(df.where(df("Count") > 50))R
display(filtered_df <- where(df, df$Count > 50))Stisknutím klávesy
Shift+Enterspusťte buňku a přejděte na další buňku.
Výběr sloupců z datového rámce a pořadí podle frekvence
Přečtěte si, která frekvence názvu dítěte pomocí metody select() určuje sloupce z datového rámce, které se mají vrátit. K seřazení výsledků použijte Apache Spark orderby a desc funkce.
Modul pyspark.sql pro Apache Spark poskytuje podporu funkcí SQL. Mezi tyto funkce, které používáme v tomto kurzu, patří Apache Spark orderBy(), desc()a expr() funkce. Použití těchto funkcí povolíte tak, že je podle potřeby naimportujete do relace.
Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód naimportuje
desc()funkci a pak použije metodu Apache Sparkselect()a Apache SparkorderBy()adesc()funkce k zobrazení nejběžnějších názvů a jejich počtu v sestupném pořadí.Python
from pyspark.sql.functions import desc display(df.select("First_Name", "Count").orderBy(desc("Count")))Scala
import org.apache.spark.sql.functions.desc display(df.select("First_Name", "Count").orderBy(desc("Count")))R
display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))Stisknutím klávesy
Shift+Enterspusťte buňku a přejděte na další buňku.
Vytvoření datového rámce podmnožina
Zjistěte, jak vytvořit podmnožinu datového rámce z existujícího datového rámce.
Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód používá metodu Apache Spark
filterk vytvoření nového datového rámce, který omezuje data podle roku, počtu a pohlaví. K omezení sloupců používá metodu Apache Sparkselect(). Používá také Apache SparkorderBy()adesc()funkce k seřazení nového datového rámce podle počtu.Python
subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)Scala
val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)R
subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count") display(subsetDF)Stisknutím klávesy
Shift+Enterspusťte buňku a přejděte na další buňku.
Krok 5: Uložení datového rámce
Naučte se ukládat datový rámec. Datový rámec můžete uložit do tabulky nebo zapsat datový rámec do souboru nebo do více souborů.
Uložení datového rámce do tabulky
Azure Databricks ve výchozím nastavení používá formát Delta Lake pro všechny tabulky. Pokud chcete uložit DataFrame, musíte mít oprávnění tabulky CREATE v katalogu a schématu.
Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód uloží obsah datového rámce do tabulky pomocí proměnné, kterou jste definovali na začátku tohoto kurzu.
Python
df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")Scala
df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")R
saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")Stisknutím klávesy
Shift+Enterspusťte buňku a přejděte na další buňku.
Většina aplikací Apache Spark pracuje na velkých datových sadách a distribuovaným způsobem. Apache Spark zapisuje adresář souborů místo jednoho souboru. Delta Lake rozdělí složky a soubory Parquet. Mnoho datových systémů může tyto adresáře souborů číst. Azure Databricks doporučuje používat tabulky přes cesty k souborům pro většinu aplikací.
Uložení datového rámce do souborů JSON
Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód uloží datový rámec do adresáře souborů JSON.
Python
df.write.format("json").mode("overwrite").save("/tmp/json_data")Scala
df.write.format("json").mode("overwrite").save("/tmp/json_data")R
write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")Stisknutím klávesy
Shift+Enterspusťte buňku a přejděte na další buňku.
Čtení datového rámce ze souboru JSON
Naučte se používat metodu Apache Spark spark.read.format() ke čtení dat JSON z adresáře do datového rámce.
Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód zobrazí soubory JSON, které jste uložili v předchozím příkladu.
Python
display(spark.read.format("json").json("/tmp/json_data"))Scala
display(spark.read.format("json").json("/tmp/json_data"))R
display(read.json("/tmp/json_data"))Stisknutím klávesy
Shift+Enterspusťte buňku a přejděte na další buňku.
Další úlohy: Spouštění dotazů SQL v PySpark, Scala a R
Datové rámce Apache Spark nabízejí následující možnosti pro kombinování SQL s PySpark, Scala a R. Následující kód můžete spustit ve stejném poznámkovém bloku, který jste vytvořili pro účely tohoto kurzu.
Zadání sloupce jako dotazu SQL
Naučte se používat metodu Apache Spark selectExpr() . Jedná se o variantu select() metody, která přijímá výrazy SQL a vrací aktualizovaný datový rámec. Tato metoda umožňuje použít výraz SQL, například upper.
Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód používá metodu Apache Spark
selectExpr()a výraz SQLupper, který převádí řetězcový sloupec na velká písmena a přejmenuje sloupec.Python
display(df.selectExpr("Count", "upper(County) as big_name"))Scala
display(df.selectExpr("Count", "upper(County) as big_name"))R
display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))Stisknutím klávesy
Shift+Enterspusťte buňku a přejděte na další buňku.
Pro použití syntaxe SQL pro sloupec použijte expr()
Zjistěte, jak importovat a používat funkci Apache Spark expr() k použití syntaxe SQL kdekoli, kde by byl zadán sloupec.
Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód naimportuje funkci
expr()a pak použije funkciexpr()Apache Sparku a výraz SQLlowerk převodu sloupce řetězce na malá písmena (a přejmenování sloupce).Python
from pyspark.sql.functions import expr display(df.select("Count", expr("lower(County) as little_name")))Scala
import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function display(df.select(col("Count"), expr("lower(County) as little_name")))R
display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name")) # expr() function is not supported in R, selectExpr in SparkR replicates this functionalityStisknutím klávesy
Shift+Enterspusťte buňku a přejděte na další buňku.
Spuštění libovolného dotazu SQL pomocí funkce spark.sql()
Naučte se používat funkci Apache Spark spark.sql() ke spouštění libovolných dotazů SQL.
Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód používá funkci Apache Spark
spark.sql()k dotazování tabulky SQL pomocí syntaxe SQL.Python
display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))Scala
display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))R
display(sql(paste("SELECT * FROM", path_table, ".", table_name)))Stisknutím klávesy
Shift+Enterspusťte buňku a přejděte na další buňku.
Poznámkové bloky k datovým rámcům
Následující poznámkové bloky obsahují příklady dotazů z tohoto kurzu.
Python
Kurz datových rámců s využitím Pythonu
Získejte poznámkový blok
Scala
Kurz datových rámců s využitím scaly
Získejte poznámkový blok
R
Kurz datových rámců s využitím jazyka R
Získejte poznámkový blok