Kommentar
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Den här självstudien visar hur du laddar och transformerar data med Apache Spark Python (PySpark) DataFrame API, Apache Spark Scala DataFrame API och SparkR SparkDataFrame API i Azure Databricks.
Kommentar
Om du använder Databricks Free Edition väljer du fliken Python för alla kodexempel i den här självstudien. Free Edition stöder inte R eller Scala. Dessutom begränsar Free Edition utgående internetåtkomst, så du måste ladda upp CSV-filen med hjälp av arbetsytans användargränssnitt i stället för att ladda ned den med kod. Mer information finns i Steg 1 .
I slutet av den här självstudien kommer du att förstå vad en DataFrame är och känna till följande uppgifter:
python
- Definiera variabler och kopiera offentliga data till en Unity Catalog-volym
- Skapa en dataram med Python
- Läs in data i en DataFrame från en CSV-fil
- Visa och interagera med en DataFrame
- Spara dataramen
- Köra SQL-frågor i PySpark
Se även Apache Spark PySpark API-referens.
Scala
- Definiera variabler och kopiera offentliga data till en Unity Catalog-volym
- Skapa en dataram med Scala
- Ladda in data i en DataFrame från en CSV-fil
- Visa och interagera med en DataFrame
- Spara dataramen
- Köra SQL-frågor i Apache Spark
Se även Apache Spark Scala API-referens.
R
- Definiera variabler och kopiera offentliga data till en Unity Catalog-volym
- Skapa en SparkR SparkDataFrame
- Ladda in data i en DataFrame från en CSV-fil
- Visa och interagera med en DataFrame
- Spara dataramen
- Köra SQL-frågor i SparkR
Se även Apache SparkR API-referens.
Vad är en DataFrame?
En DataFrame är en tvådimensionell etiketterad datastruktur med kolumner av potentiellt olika typer. Du kan tänka dig en DataFrame som ett kalkylblad, en SQL-tabell eller en ordlista med serieobjekt. Apache Spark DataFrames tillhandahåller en omfattande uppsättning funktioner (välj kolumner, filtrera, koppla, aggregera) som gör att du kan lösa vanliga dataanalysproblem effektivt.
Apache Spark DataFrames är en abstraktion som bygger på Resilient Distributed Datasets (RDD). Spark DataFrames och Spark SQL använder en enhetlig planerings- och optimeringsmotor så att du kan få nästan identiska prestanda för alla språk som stöds på Azure Databricks (Python, SQL, Scala och R).
Krav
För att slutföra följande självstudie måste du uppfylla följande krav:
Om du vill använda exemplen i den här självstudien måste arbetsytan ha Unity Catalog aktiverat. Azure Databricks Free Edition och kostnadsfria utvärderingsarbetsytor har Unity Catalog aktiverat som standard.
Exemplen i den här självstudien använder en volym i Unity Catalog
för att lagra exempeldata. Om du vill använda dessa exempel skapar du en volym och använder volymens katalog-, schema- och volymnamn för att ange den volymsökväg som används av exemplen. Free Edition-användare har åtkomst till arbetsytekatalogen defaultoch schemat som standard.Du måste ha följande behörigheter i Unity Catalog:
-
READ VOLUMEochWRITE VOLUMEför volymen som används för den här handledningen -
USE SCHEMAför schemat som används för den här handledningen -
USE CATALOGför katalogen som används för denna handledning
Information om hur du anger dessa behörigheter finns i administratörsbehörigheter för Azure Databricks eller Unity Catalog och skyddsbara objekt. Free Edition-användare har dessa behörigheter i arbetsytekatalogen och
defaultschemat som standard.-
Tips
Se den färdiga notebooken för denna artikel under DataFrame handledningsnotebooks.
Steg 1: Definiera variabler och läsa in CSV-fil
Det här steget definierar variabler för användning i denna handledning och läser sedan in en CSV-fil som innehåller babynamnsdata från health.data.ny.gov till din Unity Catalog-volym. Du behöver namnen på en Unity Catalog-katalog, ett schema och en volym.
Tips
Om du inte känner till katalog- och schemanamnen klickar du på Katalog i sidofältet. Arbetsytekatalogen delar ett namn med din arbetsyta och visas i katalogpanelen. Expandera den för att se tillgängliga scheman. Free Edition och kostnadsfria utvärderingsanvändare kan använda arbetsytekatalogen
default och schemat.
Om du inte har en volym skapar du en genom att köra följande kommando i en notebook-cell (ersätt <catalog_name> och <schema_name> med dina värden):
CREATE VOLUME IF NOT EXISTS <catalog_name>.<schema_name>.my_volume
Öppna en ny anteckningsbok genom att
klicka på ikonen. För att lära dig hur du navigerar i Azure Databricks notebooks, se Anpassa notebook-utseende.Kopiera och klistra in följande kod i den nya tomma notebook-cellen. Ersätt
<catalog-name>,<schema-name>och<volume-name>med katalog-, schema- och volymnamnen för en Unity Catalog-volym. Ersätt<table_name>med ett valfritt tabellnamn. Du laddar in barnnamnsdata i den här tabellen senare i den här handledningen.python
catalog = "<catalog_name>" schema = "<schema_name>" volume = "<volume_name>" download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name = "rows.csv" table_name = "<table_name>" path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume path_table = catalog + "." + schema print(path_table) # Show the complete path print(path_volume) # Show the complete pathScala
val catalog = "<catalog_name>" val schema = "<schema_name>" val volume = "<volume_name>" val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" val fileName = "rows.csv" val tableName = "<table_name>" val pathVolume = s"/Volumes/$catalog/$schema/$volume" val pathTable = s"$catalog.$schema" print(pathVolume) // Show the complete path print(pathTable) // Show the complete pathR
catalog <- "<catalog_name>" schema <- "<schema_name>" volume <- "<volume_name>" download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name <- "rows.csv" table_name <- "<table_name>" path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "") path_table <- paste(catalog, ".", schema, sep = "") print(path_volume) # Show the complete path print(path_table) # Show the complete pathTryck
Shift+Enterför att köra cellen och skapa en ny tom cell.Ladda CSV-filen till din volym. Använd någon av följande metoder:
- Ladda upp med arbetsytans användargränssnitt – Använd den här metoden om du använder Databricks Free Edition, eller om kodhämtningen i alternativ B misslyckas med ett nätverksfel. Free Edition och andra serverlösa beräkningsmiljöer begränsar utgående Internetåtkomst, så du måste ladda upp filen från den lokala datorn.
- Ladda ned med hjälp av kod – Använd den här metoden om din beräkningsmiljö har utgående Internetåtkomst.
Alternativ A: Ladda upp med hjälp av arbetsytans användargränssnitt
- Öppna health.data.ny.gov/api/views/jxy9-yhdk/rows.csv i webbläsaren på den lokala datorn. Filen laddas ned till datorn som
rows.csv, vilket matchar variabelnfile_namesom definierades tidigare. - Gå tillbaka till din Azure Databricks-arbetsyta. I sidofältet klickar du på
Ny > lägg till eller ladda upp data. - Klicka på Ladda upp filer till en volym.
- Klicka på Bläddra och välj
rows.csvfilen eller dra och släpp den i uppladdningsområdet. - Under Målvolym väljer du den volym som du angav ovan.
- När uppladdningen är klar går du tillbaka till anteckningsboken och fortsätter med steg 2.
Mer information om hur du laddar upp filer finns i Ladda upp filer till en Unity Catalog-volym.
Alternativ B: Ladda ned med hjälp av kod
Kopiera och klistra in följande kod i den nya tomma notebook-cellen. Den här koden kopierar
rows.csvfilen från health.data.ny.gov till Unity Catalog-volymen med hjälp av kommandot Databricks dbutils . TryckShift+Enterför att köra cellen och flytta sedan till nästa cell.python
dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")Scala
dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")R
dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
Steg 2: Skapa en dataram
Det här steget skapar en DataFrame med namnet df1 med testdata och visar sedan dess innehåll.
Kopiera och klistra in följande kod i den nya tomma notebook-cellen. Den här koden skapar DataFrame med testdata och visar sedan innehållet och schemat för DataFrame.
python
data = [[2021, "test", "Albany", "M", 42]] columns = ["Year", "First_Name", "County", "Sex", "Count"] # highlight-next-line df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int") display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.Scala
val data = Seq((2021, "test", "Albany", "M", 42)) val columns = Seq("Year", "First_Name", "County", "Sex", "Count") // highlight-next-line val df1 = data.toDF(columns: _*) display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization. // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.R
# Load the SparkR package that is already preinstalled on the cluster. library(SparkR) data <- data.frame( Year = as.integer(c(2021)), First_Name = c("test"), County = c("Albany"), Sex = c("M"), Count = as.integer(c(42)) ) # highlight-next-line df1 <- createDataFrame(data) display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.Tryck
Shift+Enterför att köra cellen och flytta sedan till nästa cell.
Steg 3: Läsa in data i en DataFrame från CSV-fil
Det här steget skapar en DataFrame med namnet df_csv från CSV-filen som du tidigare läste in i Unity Catalog-volymen. Se spark.read.csv.
Kopiera och klistra in följande kod i den nya tomma notebook-cellen. Den här koden läser in babynamnsdata i DataFrame
df_csvfrån CSV-filen och visar sedan innehållet i DataFrame.python
df_csv = spark.read.csv(f"{path_volume}/{file_name}", header=True, inferSchema=True, sep=",") display(df_csv)Scala
val dfCsv = spark.read .option("header", "true") .option("inferSchema", "true") .option("delimiter", ",") .csv(s"$pathVolume/$fileName") display(dfCsv)R
df_csv <- read.df(paste(path_volume, "/", file_name, sep=""), source="csv", header = TRUE, inferSchema = TRUE, delimiter = ",") display(df_csv)Tryck
Shift+Enterför att köra cellen och flytta sedan till nästa cell.
Du kan läsa in data från många filformat som stöds.
Steg 4: Visa och interagera med din DataFrame
Visa och interagera med dina babynamn-DataFrames med hjälp av följande metoder.
Skriva ut DataFrame-schemat
Lär dig hur du visar schemat för en Apache Spark DataFrame. Apache Spark använder termen schema för att referera till namnen och datatyperna för kolumnerna i DataFrame.
Kommentar
Azure Databricks använder också termschemat för att beskriva en samling tabeller som är registrerade i en katalog.
Kopiera och klistra in följande kod i en tom notebook-cell. Den här koden visar schemat för dina DataFrames med metoden
.printSchema()för att visa scheman för båda DataFrames - för att förbereda att unionera dem.python
df_csv.printSchema() df1.printSchema()Scala
dfCsv.printSchema() df1.printSchema()R
printSchema(df_csv) printSchema(df1)Tryck
Shift+Enterför att köra cellen och flytta sedan till nästa cell.
Byt namn på kolumnen i DataFrame
Lär dig hur du byter namn på en kolumn i en DataFrame.
Kopiera och klistra in följande kod i en tom notebook-cell. Den här koden byter namn på en kolumn i
df1_csvDataFrame så att den matchar respektive kolumn idf1DataFrame. Den här koden använder Apache Spark-metodenwithColumnRenamed().python
df_csv = df_csv.withColumnRenamed("First Name", "First_Name") df_csv.printSchema()Scala
val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name") // when modifying a DataFrame in Scala, you must assign it to a new variable dfCsvRenamed.printSchema()R
df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name") printSchema(df_csv)Tryck
Shift+Enterför att köra cellen och flytta sedan till nästa cell.
Kombinera DataFrames
Lär dig hur du skapar en ny DataFrame som lägger till raderna i en DataFrame till en annan.
Kopiera och klistra in följande kod i en tom notebook-cell. Den här koden använder Apache Spark-metoden
union()för att kombinera innehållet i din första DataFramedfmed DataFramedf_csvsom innehåller babynamndata som lästs in från CSV-filen.python
df = df1.union(df_csv) display(df)Scala
val df = df1.union(dfCsvRenamed) display(df)R
display(df <- union(df1, df_csv))Tryck
Shift+Enterför att köra cellen och flytta sedan till nästa cell.
Filtrera rader i en dataram
Identifiera de mest populära babynamnen i datauppsättningen genom att filtrera rader med hjälp av Apache Spark-.filter() eller .where() metoder. Använd filtrering för att välja en delmängd rader som ska returneras eller ändras i en DataFrame. Det finns ingen skillnad i prestanda eller syntax, som du ser i följande exempel.
Använda metoden .filter()
Kopiera och klistra in följande kod i en tom notebook-cell. Den här koden använder Apache Spark-metoden
.filter()för att visa dessa rader i DataFrame med ett antal på mer än 50.python
display(df.filter(df["Count"] > 50))Scala
display(df.filter(df("Count") > 50))R
display(filteredDF <- filter(df, df$Count > 50))Tryck
Shift+Enterför att köra cellen och flytta sedan till nästa cell.
Använda .where()-metod
Kopiera och klistra in följande kod i en tom notebook-cell. Den här koden använder Apache Spark-metoden
.where()för att visa dessa rader i DataFrame med ett antal på mer än 50.python
display(df.where(df["Count"] > 50))Scala
display(df.where(df("Count") > 50))R
display(filtered_df <- where(df, df$Count > 50))Tryck
Shift+Enterför att köra cellen och flytta sedan till nästa cell.
Välj kolumner från en DataFrame och sortera efter frekvens
Lär dig mer om vilken babynamnsfrekvens med metoden select() för att ange kolumnerna från DataFrame som ska returneras. Använd Apache Spark orderby och desc funktioner för att ordna resultatet.
Modulen pyspark.sql för Apache Spark ger stöd för SQL-funktioner. Bland de här funktionerna som vi använder i den här självstudien finns funktionerna Apache Spark orderBy(), desc()och expr() . Du aktiverar användningen av dessa funktioner genom att importera dem till sessionen efter behov.
Kopiera och klistra in följande kod i en tom notebook-cell. Den här koden importerar
desc()funktionen och använder sedan Apache Spark-metodenselect()och Apache SparkorderBy()ochdesc()funktioner för att visa de vanligaste namnen och deras antal i fallande ordning.python
from pyspark.sql.functions import desc display(df.select("First_Name", "Count").orderBy(desc("Count")))Scala
import org.apache.spark.sql.functions.desc display(df.select("First_Name", "Count").orderBy(desc("Count")))R
display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))Tryck
Shift+Enterför att köra cellen och flytta sedan till nästa cell.
Skapa en delmängd av DataFrame
Lär dig hur du skapar en delmängd av DataFrame från en befintlig DataFrame.
Kopiera och klistra in följande kod i en tom notebook-cell. Den här koden använder Apache Spark-metoden
filterför att skapa en ny DataFrame som begränsar data efter år, antal och kön. Den använder metoden Apache Sparkselect()för att begränsa kolumnerna. Den använder också Apache SparkorderBy()ochdesc()funktioner för att sortera den nya DataFrame efter antal.python
subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)Scala
val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)R
subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count") display(subsetDF)Tryck
Shift+Enterför att köra cellen och flytta sedan till nästa cell.
Steg 5: Spara dataramen
Lär dig hur du sparar en DataFrame. Du kan antingen spara dataramen i en tabell eller skriva DataFrame till en fil eller flera filer.
Spara DataFrame i en tabell
Azure Databricks använder Delta Lake-formatet för alla tabeller som standard. Om du vill spara dataramen måste du ha CREATE tabellbehörigheter i katalogen och schemat.
Kopiera och klistra in följande kod i en tom notebook-cell. Den här koden sparar innehållet i DataFrame till en tabell med hjälp av variabeln som du definierade i början av den här självstudien.
python
df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")Scala
df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")R
saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")Tryck
Shift+Enterför att köra cellen och flytta sedan till nästa cell.
De flesta Apache Spark-program fungerar på stora datamängder och på ett distribuerat sätt. Apache Spark skriver ut en katalog med filer i stället för en enda fil. Delta Lake delar upp Parquet-mapparna och filerna. Många datasystem kan läsa dessa kataloger med filer. Azure Databricks rekommenderar att du använder tabeller över filsökvägar för de flesta program.
Spara DataFrame till JSON-filer
Kopiera och klistra in följande kod i en tom notebook-cell. Den här koden sparar DataFrame till en katalog med JSON-filer.
python
df.write.format("json").mode("overwrite").save("/tmp/json_data")Scala
df.write.format("json").mode("overwrite").save("/tmp/json_data")R
write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")Tryck
Shift+Enterför att köra cellen och flytta sedan till nästa cell.
Läs DataFrame från en JSON-fil
Lär dig hur du använder Apache Spark-metoden spark.read.format() för att läsa JSON-data från en katalog till en DataFrame.
Kopiera och klistra in följande kod i en tom notebook-cell. Den här koden visar JSON-filerna som du sparade i föregående exempel.
python
display(spark.read.format("json").json("/tmp/json_data"))Scala
display(spark.read.format("json").json("/tmp/json_data"))R
display(read.json("/tmp/json_data"))Tryck
Shift+Enterför att köra cellen och flytta sedan till nästa cell.
Ytterligare uppgifter: Köra SQL-frågor i PySpark, Scala och R
Apache Spark DataFrames innehåller följande alternativ för att kombinera SQL med PySpark, Scala och R. Du kan köra följande kod i samma notebook-fil som du skapade för den här självstudien.
Ange en kolumn som en SQL-fråga
Lär dig hur du använder Apache Spark-metoden selectExpr() . Det här är en variant av metoden select() som accepterar SQL-uttryck och returnerar en uppdaterad DataFrame. Med den här metoden kan du använda ett SQL-uttryck, till exempel upper.
Kopiera och klistra in följande kod i en tom notebook-cell. Den här koden använder metoden Apache Spark
selectExpr()och SQLupper-uttrycket för att konvertera en strängkolumn till versaler (och byta namn på kolumnen).python
display(df.selectExpr("Count", "upper(County) as big_name"))Scala
display(df.selectExpr("Count", "upper(County) as big_name"))R
display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))Tryck
Shift+Enterför att köra cellen och flytta sedan till nästa cell.
Använd expr() för att använda SQL-syntax för en kolumn
Lär dig hur du importerar och använder funktionen Apache Spark expr() för att använda SQL-syntax var som helst där en kolumn skulle anges.
Kopiera och klistra in följande kod i en tom notebook-cell. Den här koden importerar funktionen
expr()och använder sedan funktionen Apache Sparkexpr()samt SQL-uttrycketlowerför att konvertera en strängkolumn till gemener (och byta namn på kolumnen).python
from pyspark.sql.functions import expr display(df.select("Count", expr("lower(County) as little_name")))Scala
import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function display(df.select(col("Count"), expr("lower(County) as little_name")))R
display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name")) # expr() function is not supported in R, selectExpr in SparkR replicates this functionalityTryck
Shift+Enterför att köra cellen och flytta sedan till nästa cell.
Köra en godtycklig SQL-fråga med hjälp av funktionen spark.sql()
Lär dig hur du använder Apache Spark-funktionen spark.sql() för att köra godtyckliga SQL-frågor.
Kopiera och klistra in följande kod i en tom notebook-cell. Den här koden använder funktionen Apache Spark
spark.sql()för att köra frågor mot en SQL-tabell med sql-syntax.python
display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))Scala
display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))R
display(sql(paste("SELECT * FROM", path_table, ".", table_name)))Tryck
Shift+Enterför att köra cellen och flytta sedan till nästa cell.
DataFrame-handledning-notebookar
Följande notebook-filer innehåller exempelfrågor från den här självstudien.