Samouczek: ładowanie i przekształcanie danych przy użyciu ramek danych platformy Apache Spark
W tym samouczku pokazano, jak ładować i przekształcać dane przy użyciu interfejsu API ramki danych platformy Apache Spark (PySpark), interfejsu API ramki danych Platformy Apache Spark Scala oraz interfejsu API SparkDataFrame sparkR w usłudze Azure Databricks.
Po ukończeniu tego samouczka dowiesz się, czym jest ramka danych i zapoznasz się z następującymi zadaniami:
Python
- Definiowanie zmiennych i kopiowanie danych publicznych do woluminu wykazu aparatu Unity
- Tworzenie ramki danych przy użyciu języka Python
- Ładowanie danych do ramki danych z pliku CSV
- Wyświetlanie ramki danych i interakcja z nią
- Zapisywanie ramki danych
- Uruchamianie zapytań SQL w narzędziu PySpark
Zobacz też dokumentację interfejsu API platformy Apache Spark PySpark.
Scala
- Definiowanie zmiennych i kopiowanie danych publicznych do woluminu wykazu aparatu Unity
- Tworzenie ramki danych za pomocą języka Scala
- Ładowanie danych do ramki danych z pliku CSV
- Wyświetlanie ramki danych i interakcja z nią
- Zapisywanie ramki danych
- Uruchamianie zapytań SQL na platformie Apache Spark
Zobacz też Dokumentację interfejsu API języka Scala platformy Apache Spark.
R
- Definiowanie zmiennych i kopiowanie danych publicznych do woluminu wykazu aparatu Unity
- Tworzenie elementów SparkR SparkDataFrames
- Ładowanie danych do ramki danych z pliku CSV
- Wyświetlanie ramki danych i interakcja z nią
- Zapisywanie ramki danych
- Uruchamianie zapytań SQL w usłudze SparkR
Zobacz też dokumentację interfejsu API platformy Apache SparkR.
Co to jest ramka danych?
Ramka danych to dwuwymiarowa struktura danych z kolumnami potencjalnie różnych typów. Ramka danych może być podobna do arkusza kalkulacyjnego, tabeli SQL lub słownika obiektów serii. Ramki danych platformy Apache Spark zapewniają bogaty zestaw funkcji (wybieranie kolumn, filtrowanie, sprzężenie, agregowanie), które umożliwiają efektywne rozwiązywanie typowych problemów z analizą danych.
Ramki danych platformy Apache Spark to abstrakcja oparta na odpornych rozproszonych zestawach danych (RDD). Ramki danych platformy Spark i platforma Spark SQL używają ujednoliconego aparatu planowania i optymalizacji, co pozwala uzyskać niemal identyczną wydajność we wszystkich obsługiwanych językach w usłudze Azure Databricks (Python, SQL, Scala i R).
Wymagania
Aby wykonać czynności opisane w poniższym samouczku, musisz spełnić następujące wymagania:
Aby użyć przykładów w tym samouczku, obszar roboczy musi mieć włączony wykaz aparatu Unity.
Przykłady w tym samouczku używają woluminu wykazu aparatu Unity do przechowywania przykładowych danych. Aby użyć tych przykładów, utwórz wolumin i użyj wykazu, schematu i nazw woluminów, aby ustawić ścieżkę woluminu używaną przez przykłady.
Musisz mieć następujące uprawnienia w katalogu aparatu Unity:
READ VOLUME
iWRITE VOLUME
, lubALL PRIVILEGES
dla woluminu używanego na potrzeby tego samouczka.USE SCHEMA
lubALL PRIVILEGES
schemat używany na potrzeby tego samouczka.USE CATALOG
lubALL PRIVILEGES
katalogu używanego na potrzeby tego samouczka.
Aby ustawić te uprawnienia, zobacz uprawnienia administratora usługi Databricks lub katalogu aparatu Unity oraz zabezpieczane obiekty.
Napiwek
Aby zapoznać się z ukończonym notesem dla tego artykułu, zobacz Notesy samouczka dotyczące ramki danych.
Krok 1. Definiowanie zmiennych i ładowanie pliku CSV
Ten krok definiuje zmienne do użycia w tym samouczku, a następnie ładuje plik CSV zawierający dane nazwy dziecka z health.data.ny.gov do woluminu wykazu aparatu Unity.
Otwórz nowy notes, klikając ikonę . Aby dowiedzieć się, jak nawigować po notesach usługi Azure Databricks, zobacz Interfejs i kontrolki notesu usługi Databricks.
Skopiuj i wklej następujący kod do nowej pustej komórki notesu. Zastąp
<catalog-name>
wartości ,<schema-name>
i<volume-name>
nazwami wykazu, schematu i woluminu dla woluminu wykazu aparatu Unity. Zastąp<table_name>
wybraną nazwą tabeli. Dane nazwy dziecka zostaną załadowane do tej tabeli w dalszej części tego samouczka.Python
catalog = "<catalog_name>" schema = "<schema_name>" volume = "<volume_name>" download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name = "rows.csv" table_name = "<table_name>" path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume path_table = catalog + "." + schema print(path_table) # Show the complete path print(path_volume) # Show the complete path
Scala
val catalog = "<catalog_name>" val schema = "<schema_name>" val volume = "<volume_name>" val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" val fileName = "rows.csv" val tableName = "<table_name>" val pathVolume = s"/Volumes/$catalog/$schema/$volume" val pathTable = s"$catalog.$schema" print(pathVolume) // Show the complete path print(pathTable) // Show the complete path
R
catalog <- "<catalog_name>" schema <- "<schema_name>" volume <- "<volume_name>" download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name <- "rows.csv" table_name <- "<table_name>" path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "") path_table <- paste(catalog, ".", schema, sep = "") print(path_volume) # Show the complete path print(path_table) # Show the complete path
Naciśnij
Shift+Enter
, aby uruchomić komórkę i utworzyć nową pustą komórkę.Skopiuj i wklej następujący kod do nowej pustej komórki notesu. Ten kod kopiuje
rows.csv
plik z health.data.ny.gov do woluminu wykazu aparatu Unity przy użyciu polecenia dbutuils usługi Databricks .Python
dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")
Scala
dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")
R
dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
Naciśnij ,
Shift+Enter
aby uruchomić komórkę, a następnie przejdź do następnej komórki.
Krok 2. Tworzenie ramki danych
Ten krok tworzy ramkę danych o nazwie z df1
danymi testowymi, a następnie wyświetla jej zawartość.
Skopiuj i wklej następujący kod do nowej pustej komórki notesu. Ten kod tworzy ramkę danych z danymi testowymi, a następnie wyświetla zawartość i schemat ramki danych.
Python
data = [[2021, "test", "Albany", "M", 42]] columns = ["Year", "First_Name", "County", "Sex", "Count"] df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int") display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
Scala
val data = Seq((2021, "test", "Albany", "M", 42)) val columns = Seq("Year", "First_Name", "County", "Sex", "Count") val df1 = data.toDF(columns: _*) display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization. // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
R
# Load the SparkR package that is already preinstalled on the cluster. library(SparkR) data <- data.frame( Year = as.integer(c(2021)), First_Name = c("test"), County = c("Albany"), Sex = c("M"), Count = as.integer(c(42)) ) df1 <- createDataFrame(data) display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
Naciśnij ,
Shift+Enter
aby uruchomić komórkę, a następnie przejdź do następnej komórki.
Krok 3. Ładowanie danych do ramki danych z pliku CSV
W tym kroku zostanie utworzona ramka danych o nazwie df_csv
z pliku CSV, który został wcześniej załadowany do woluminu wykazu aparatu Unity. Zobacz spark.read.csv.
Skopiuj i wklej następujący kod do nowej pustej komórki notesu. Ten kod ładuje dane baby name do ramki danych z pliku CSV, a następnie wyświetla zawartość ramki
df_csv
danych.Python
df_csv = spark.read.csv(f"{path_volume}/{file_name}", header=True, inferSchema=True, sep=",") display(df_csv)
Scala
val dfCsv = spark.read .option("header", "true") .option("inferSchema", "true") .option("delimiter", ",") .csv(s"$pathVolume/$fileName") display(dfCsv)
R
df_csv <- read.df(paste(path_volume, "/", file_name, sep=""), source="csv", header = TRUE, inferSchema = TRUE, delimiter = ",") display(df_csv)
Naciśnij ,
Shift+Enter
aby uruchomić komórkę, a następnie przejdź do następnej komórki.
Dane można załadować z wielu obsługiwanych formatów plików.
Krok 4. Wyświetlanie ramki danych i interakcja z nią
Wyświetlaj ramki danych dla dzieci i wchodzić z nimi w interakcje, korzystając z poniższych metod.
Drukowanie schematu ramki danych
Dowiedz się, jak wyświetlić schemat ramki danych platformy Apache Spark. Platforma Apache Spark używa terminu schema do odwoływania się do nazw i typów danych kolumn w ramce danych.
Uwaga
Usługa Azure Databricks używa również terminu schema do opisania kolekcji tabel zarejestrowanych w wykazie.
Skopiuj i wklej następujący kod do pustej komórki notesu. Ten kod przedstawia schemat ramek danych z
.printSchema()
metodą wyświetlania schematów dwóch ramek danych — w celu przygotowania do połączenia dwóch ramek danych.Python
df_csv.printSchema() df1.printSchema()
Scala
dfCsv.printSchema() df1.printSchema()
R
printSchema(df_csv) printSchema(df1)
Naciśnij ,
Shift+Enter
aby uruchomić komórkę, a następnie przejdź do następnej komórki.
Zmienianie nazwy kolumny w ramce danych
Dowiedz się, jak zmienić nazwę kolumny w ramce danych.
Skopiuj i wklej następujący kod do pustej komórki notesu. Ten kod zmienia nazwę kolumny w ramce
df1_csv
danych, aby dopasować je do odpowiedniej kolumny w ramcedf1
danych. Ten kod używa metody Apache SparkwithColumnRenamed()
.Python
df_csv = df_csv.withColumnRenamed("First Name", "First_Name") df_csv.printSchema
Scala
val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name") // when modifying a DataFrame in Scala, you must assign it to a new variable dfCsvRenamed.printSchema()
R
df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name") printSchema(df_csv)
Naciśnij ,
Shift+Enter
aby uruchomić komórkę, a następnie przejdź do następnej komórki.
Łączenie ramek danych
Dowiedz się, jak utworzyć nową ramkę danych, która dodaje wiersze jednej ramki danych do innej.
Skopiuj i wklej następujący kod do pustej komórki notesu. Ten kod używa metody Apache Spark
union()
, aby połączyć zawartość pierwszej ramkidf
danych z ramą danychdf_csv
zawierającą dane nazw dzieci załadowanych z pliku CSV.Python
df = df1.union(df_csv) display(df)
Scala
val df = df1.union(dfCsvRenamed) display(df)
R
display(df <- union(df1, df_csv))
Naciśnij ,
Shift+Enter
aby uruchomić komórkę, a następnie przejdź do następnej komórki.
Filtrowanie wierszy w ramce danych
Odkryj najpopularniejsze nazwy dzieci w zestawie danych, filtrując wiersze przy użyciu platformy Apache Spark .filter()
lub .where()
metod. Użyj filtrowania, aby wybrać podzbiór wierszy, które mają być zwracane lub modyfikowane w ramce danych. Nie ma różnicy w wydajności lub składni, jak pokazano w poniższych przykładach.
Korzystanie z metody .filter()
Skopiuj i wklej następujący kod do pustej komórki notesu. Ten kod używa metody Apache Spark
.filter()
do wyświetlania tych wierszy w ramce danych z liczbą większą niż 50.Python
display(df.filter(df["Count"] > 50))
Scala
display(df.filter(df("Count") > 50))
R
display(filteredDF <- filter(df, df$Count > 50))
Naciśnij ,
Shift+Enter
aby uruchomić komórkę, a następnie przejdź do następnej komórki.
Używanie metody .where()
Skopiuj i wklej następujący kod do pustej komórki notesu. Ten kod używa metody Apache Spark
.where()
do wyświetlania tych wierszy w ramce danych z liczbą większą niż 50.Python
display(df.where(df["Count"] > 50))
Scala
display(df.where(df("Count") > 50))
R
display(filtered_df <- where(df, df$Count > 50))
Naciśnij ,
Shift+Enter
aby uruchomić komórkę, a następnie przejdź do następnej komórki.
Wybieranie kolumn z ramki danych i kolejność według częstotliwości
Dowiedz się, która częstotliwość nazw dziecka z select()
metodą określa kolumny z ramki danych do zwrócenia. Użyj platformy Apache Spark orderby
i desc
funkcji, aby uporządkować wyniki.
Moduł pyspark.sql dla platformy Apache Spark zapewnia obsługę funkcji SQL. Wśród tych funkcji używanych w tym samouczku są funkcje apache Spark orderBy()
, desc()
i expr()
. Korzystanie z tych funkcji można włączyć, importując je do sesji zgodnie z potrzebami.
Skopiuj i wklej następujący kod do pustej komórki notesu. Ten kod importuje
desc()
funkcję, a następnie używa metody Apache Spark i platformy Apache Sparkselect()
orderBy()
orazdesc()
funkcji do wyświetlania najbardziej typowych nazw i ich liczby w kolejności malejącej.Python
from pyspark.sql.functions import desc display(df.select("First_Name", "Count").orderBy(desc("Count")))
Scala
import org.apache.spark.sql.functions.desc display(df.select("First_Name", "Count").orderBy(desc("Count")))
R
display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
Naciśnij ,
Shift+Enter
aby uruchomić komórkę, a następnie przejdź do następnej komórki.
Tworzenie podzestawu ramki danych
Dowiedz się, jak utworzyć podzestaw ramki danych na podstawie istniejącej ramki danych.
Skopiuj i wklej następujący kod do pustej komórki notesu. Ten kod używa metody Apache Spark
filter
, aby utworzyć nową ramkę danych ograniczającą dane według roku, liczby i płci. Używa metody Apache Sparkselect()
, aby ograniczyć kolumny. Używa również platformy Apache SparkorderBy()
idesc()
funkcji do sortowania nowej ramki danych według liczby.Python
subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)
Scala
val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)
R
subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count") display(subsetDF)
Naciśnij ,
Shift+Enter
aby uruchomić komórkę, a następnie przejdź do następnej komórki.
Krok 5. Zapisywanie ramki danych
Dowiedz się, jak zapisać ramkę danych. Możesz zapisać ramkę danych w tabeli lub zapisać ramkę danych w pliku lub wielu plikach.
Zapisywanie ramki danych w tabeli
Usługa Azure Databricks domyślnie używa formatu usługi Delta Lake dla wszystkich tabel. Aby zapisać ramkę danych, musisz mieć CREATE
uprawnienia tabeli w katalogu i schemacie.
Skopiuj i wklej następujący kod do pustej komórki notesu. Ten kod zapisuje zawartość ramki danych w tabeli przy użyciu zmiennej zdefiniowanej na początku tego samouczka.
Python
df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")
Scala
df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")
R
saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")
Naciśnij ,
Shift+Enter
aby uruchomić komórkę, a następnie przejdź do następnej komórki.
Większość aplikacji platformy Apache Spark działa na dużych zestawach danych i w sposób rozproszony. Platforma Apache Spark zapisuje katalog plików, a nie jeden plik. Usługa Delta Lake dzieli foldery Parquet i pliki. Wiele systemów danych może odczytywać te katalogi plików. Usługa Azure Databricks zaleca używanie tabel przez ścieżki plików dla większości aplikacji.
Zapisywanie ramki danych w plikach JSON
Skopiuj i wklej następujący kod do pustej komórki notesu. Ten kod zapisuje ramkę danych w katalogu plików JSON.
Python
df.write.format("json").mode("overwrite").save("/tmp/json_data")
Scala
df.write.format("json").mode("overwrite").save("/tmp/json_data")
R
write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
Naciśnij ,
Shift+Enter
aby uruchomić komórkę, a następnie przejdź do następnej komórki.
Odczytywanie ramki danych z pliku JSON
Dowiedz się, jak używać metody Apache Spark spark.read.format()
do odczytywania danych JSON z katalogu do ramki danych.
Skopiuj i wklej następujący kod do pustej komórki notesu. Ten kod wyświetla pliki JSON zapisane w poprzednim przykładzie.
Python
display(spark.read.format("json").json("/tmp/json_data"))
Scala
display(spark.read.format("json").json("/tmp/json_data"))
R
display(read.json("/tmp/json_data"))
Naciśnij ,
Shift+Enter
aby uruchomić komórkę, a następnie przejdź do następnej komórki.
Dodatkowe zadania: Uruchamianie zapytań SQL w PySpark, Scala i R
Ramki danych platformy Apache Spark udostępniają następujące opcje łączenia języka SQL z narzędziami PySpark, Scala i R. Poniższy kod można uruchomić w tym samym notesie, który został utworzony na potrzeby tego samouczka.
Określanie kolumny jako zapytania SQL
Dowiedz się, jak używać metody Apache Spark selectExpr()
. Jest to wariant select()
metody, która akceptuje wyrażenia SQL i zwraca zaktualizowaną ramkę danych. Ta metoda umożliwia użycie wyrażenia SQL, takiego jak upper
.
Skopiuj i wklej następujący kod do pustej komórki notesu. Ten kod używa metody Apache Spark
selectExpr()
i wyrażenia SQLupper
, aby przekonwertować kolumnę ciągu na wielkie litery (i zmienić nazwę kolumny).Python
display(df.selectExpr("Count", "upper(County) as big_name"))
Scala
display(df.selectExpr("Count", "upper(County) as big_name"))
R
display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
Naciśnij ,
Shift+Enter
aby uruchomić komórkę, a następnie przejdź do następnej komórki.
Użyj expr()
polecenia , aby użyć składni SQL dla kolumny
Dowiedz się, jak importować i używać funkcji Apache Spark expr()
do używania składni SQL w dowolnym miejscu, w jakim zostanie określona kolumna.
Skopiuj i wklej następujący kod do pustej komórki notesu. Ten kod importuje
expr()
funkcję, a następnie używa funkcji Apache Sparkexpr()
i wyrażenia SQLlower
, aby przekonwertować kolumnę ciągu na małe litery (i zmienić nazwę kolumny).Python
from pyspark.sql.functions import expr display(df.select("Count", expr("lower(County) as little_name")))
Scala
import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function display(df.select(col("Count"), expr("lower(County) as little_name")))
R
display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name")) # expr() function is not supported in R, selectExpr in SparkR replicates this functionality
Naciśnij ,
Shift+Enter
aby uruchomić komórkę, a następnie przejdź do następnej komórki.
Uruchamianie dowolnego zapytania SQL przy użyciu funkcji spark.sql()
Dowiedz się, jak używać funkcji Apache Spark spark.sql()
do uruchamiania dowolnych zapytań SQL.
Skopiuj i wklej następujący kod do pustej komórki notesu. Ten kod używa funkcji Apache Spark
spark.sql()
do wykonywania zapytań dotyczących tabeli SQL przy użyciu składni SQL.Python
display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))
Scala
display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))
R
display(sql(paste("SELECT * FROM", path_table, ".", table_name)))
Naciśnij ,
Shift+Enter
aby uruchomić komórkę, a następnie przejdź do następnej komórki.
Notesy samouczka dotyczące ramki danych
Poniższe notesy zawierają przykłady zapytań z tego samouczka.