Samouczek: ładowanie i przekształcanie danych przy użyciu ramek danych platformy Apache Spark

W tym samouczku pokazano, jak ładować i przekształcać dane przy użyciu interfejsu API ramki danych platformy Apache Spark (PySpark), interfejsu API ramki danych Platformy Apache Spark Scala oraz interfejsu API SparkDataFrame sparkR w usłudze Azure Databricks.

Po ukończeniu tego samouczka dowiesz się, czym jest ramka danych i zapoznasz się z następującymi zadaniami:

Python

Zobacz też dokumentację interfejsu API platformy Apache Spark PySpark.

Scala

Zobacz też Dokumentację interfejsu API języka Scala platformy Apache Spark.

R

Zobacz też dokumentację interfejsu API platformy Apache SparkR.

Co to jest ramka danych?

Ramka danych to dwuwymiarowa struktura danych z kolumnami potencjalnie różnych typów. Ramka danych może być podobna do arkusza kalkulacyjnego, tabeli SQL lub słownika obiektów serii. Ramki danych platformy Apache Spark zapewniają bogaty zestaw funkcji (wybieranie kolumn, filtrowanie, sprzężenie, agregowanie), które umożliwiają efektywne rozwiązywanie typowych problemów z analizą danych.

Ramki danych platformy Apache Spark to abstrakcja oparta na odpornych rozproszonych zestawach danych (RDD). Ramki danych platformy Spark i platforma Spark SQL używają ujednoliconego aparatu planowania i optymalizacji, co pozwala uzyskać niemal identyczną wydajność we wszystkich obsługiwanych językach w usłudze Azure Databricks (Python, SQL, Scala i R).

Wymagania

Aby wykonać czynności opisane w poniższym samouczku, musisz spełnić następujące wymagania:

  • Aby użyć przykładów w tym samouczku, obszar roboczy musi mieć włączony wykaz aparatu Unity.

  • Przykłady w tym samouczku używają woluminu wykazu aparatu Unity do przechowywania przykładowych danych. Aby użyć tych przykładów, utwórz wolumin i użyj wykazu, schematu i nazw woluminów, aby ustawić ścieżkę woluminu używaną przez przykłady.

  • Musisz mieć następujące uprawnienia w katalogu aparatu Unity:

    • READ VOLUME i WRITE VOLUME, lub ALL PRIVILEGES dla woluminu używanego na potrzeby tego samouczka.
    • USE SCHEMA lub ALL PRIVILEGES schemat używany na potrzeby tego samouczka.
    • USE CATALOG lub ALL PRIVILEGES katalogu używanego na potrzeby tego samouczka.

    Aby ustawić te uprawnienia, zobacz uprawnienia administratora usługi Databricks lub katalogu aparatu Unity oraz zabezpieczane obiekty.

Krok 1. Definiowanie zmiennych i ładowanie pliku CSV

Ten krok definiuje zmienne do użycia w tym samouczku, a następnie ładuje plik CSV zawierający dane nazwy dziecka z health.data.ny.gov do woluminu wykazu aparatu Unity.

  1. Otwórz nowy notes, klikając ikonę Nowa ikona . Aby dowiedzieć się, jak nawigować po notesach usługi Azure Databricks, zobacz Interfejs i kontrolki notesu usługi Databricks.

  2. Skopiuj i wklej następujący kod do nowej pustej komórki notesu. Zastąp <catalog-name>wartości , <schema-name>i <volume-name> nazwami wykazu, schematu i woluminu dla woluminu wykazu aparatu Unity. Zastąp <table_name> wybraną nazwą tabeli. Dane nazwy dziecka zostaną załadowane do tej tabeli w dalszej części tego samouczka.

  3. Naciśnij klawisz Shift+Enter , aby uruchomić komórkę i utworzyć nową pustą komórkę.

    Python

    catalog = "<catalog_name>"
    schema = "<schema_name>"
    volume = "<volume_name>"
    download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name = "rows.csv"
    table_name = "<table_name>"
    path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
    path_tables = catalog + "." + schema
    print(path_tables) # Show the complete path
    print(path_volume) # Show the complete path
    

    Scala

    val catalog = "<catalog_name>"
    val schema = "<schema_name>"
    val volume = "<volume_name>"
    val download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    val file_name = "rows.csv"
    val table_name = "<table_name>"
    val path_volume = s"/Volumes/$catalog/$schema/$volume"
    val path_tables = s"$catalog.$schema.$table_name"
    print(path_volume) // Show the complete path
    print(path_tables) // Show the complete path
    

    R

    catalog <- "<catalog_name>"
    schema <- "<schema_name>"
    volume <- "<volume_name>"
    download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name <- "rows.csv"
    table_name <- "<table_name>"
    path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "")
    path_tables <- paste(catalog, ".", schema, sep = "")
    print(path_volume) # Show the complete path
    print(path_tables) # Show the complete path
    
  4. Skopiuj i wklej następujący kod do nowej pustej komórki notesu. Ten kod kopiuje rows.csv plik z health.data.ny.gov do woluminu wykazu aparatu Unity przy użyciu polecenia dbutuils usługi Databricks .

  5. Naciśnij klawisz , Shift+Enter aby uruchomić komórkę, a następnie przejdź do następnej komórki.

    Python

    dbutils.fs.cp(f"{download_url}", f"{path_volume}" + "/" + f"{file_name}")
    

    Scala

    dbutils.fs.cp(download_url, s"$path_volume/$file_name")
    

    R

    dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
    

Krok 2. Tworzenie ramki danych

Ten krok tworzy ramkę danych o nazwie z df1 danymi testowymi, a następnie wyświetla jej zawartość.

  1. Skopiuj i wklej następujący kod do nowej pustej komórki notesu. Ten kod tworzy ramkę danych z danymi testowymi, a następnie wyświetla zawartość i schemat ramki danych.

  2. Naciśnij klawisz , Shift+Enter aby uruchomić komórkę, a następnie przejdź do następnej komórki.

    Python

    data = [[2021, "test", "Albany", "M", 42]]
    columns = ["Year", "First_Name", "County", "Sex", "Count"]
    
    df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    Scala

    val data = Seq((2021, "test", "Albany", "M", 42))
    val columns = Seq("Year", "First_Name", "County", "Sex", "Count")
    
    val df1 = data.toDF(columns: _*)
    display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization.
    // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    R

    # Load the SparkR package that is already preinstalled on the cluster.
    library(SparkR)
    
    data <- data.frame(
      Year = c(2021),
      First_Name = c("test"),
      County = c("Albany"),
      Sex = c("M"),
      Count = c(42)
    )
    
    df1 <- createDataFrame(data)
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
    

Krok 3. Ładowanie danych do ramki danych z pliku CSV

W tym kroku zostanie utworzona ramka danych o nazwie df_csv z pliku CSV, który został wcześniej załadowany do woluminu wykazu aparatu Unity. Zobacz spark.read.csv.

  1. Skopiuj i wklej następujący kod do nowej pustej komórki notesu. Ten kod ładuje dane baby name do ramki danych z pliku CSV, a następnie wyświetla zawartość ramki df_csv danych.

  2. Naciśnij klawisz , Shift+Enter aby uruchomić komórkę, a następnie przejdź do następnej komórki.

    Python

    df_csv = spark.read.csv(f"{path_volume}/{file_name}",
      header=True,
      inferSchema=True,
      sep=",")
    display(df_csv)
    

    Scala

    val df_csv = spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .option("delimiter", ",")
      .csv(s"$path_volume/$file_name")
    
    display(df_csv)
    

    R

    df_csv <- read.df(paste(path_volume, "/", file_name, sep=""),
      source="csv",
      header = TRUE,
      inferSchema = TRUE,
      delimiter = ",")
    
    display(df_csv)
    

Dane można załadować z wielu obsługiwanych formatów plików.

Krok 4. Wyświetlanie ramki danych i interakcja z nią

Wyświetlaj ramki danych dla dzieci i wchodzić z nimi w interakcje, korzystając z poniższych metod.

Dowiedz się, jak wyświetlić schemat ramki danych platformy Apache Spark. Platforma Apache Spark używa terminu schema do odwoływania się do nazw i typów danych kolumn w ramce danych.

Skopiuj i wklej następujący kod do pustej komórki notesu. Ten kod przedstawia schemat ramek danych z .printSchema() metodą wyświetlania schematów dwóch ramek danych — w celu przygotowania do połączenia dwóch ramek danych.

Python

df_csv.printSchema()
df1.printSchema()

Scala

df_csv.printSchema()
df1.printSchema()

R

printSchema(df_csv)
printSchema(df1)

Uwaga

Usługa Azure Databricks używa również terminu schema do opisania kolekcji tabel zarejestrowanych w wykazie.

Zmienianie nazwy kolumny w ramce danych

Dowiedz się, jak zmienić nazwę kolumny w ramce danych.

Skopiuj i wklej następujący kod do pustej komórki notesu. Ten kod zmienia nazwę kolumny w ramce df1_csv danych, aby dopasować je do odpowiedniej kolumny w ramce df1 danych. Ten kod używa metody Apache Spark withColumnRenamed() .

Python

df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
df_csv.printSchema

Scala

val df_csvRenamed = df_csv.withColumnRenamed("First Name", "First_Name")
// when modifying a DataFrame in Scala, you must assign it to a new variable
df_csv_renamed.printSchema()

R

df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name")
printSchema(df_csv)

Łączenie ramek danych

Dowiedz się, jak utworzyć nową ramkę danych, która dodaje wiersze jednej ramki danych do innej.

Skopiuj i wklej następujący kod do pustej komórki notesu. Ten kod używa metody Apache Spark union() , aby połączyć zawartość pierwszej ramki df danych z ramą danych df_csv zawierającą dane nazw dzieci załadowanych z pliku CSV.

Python

df = df1.union(df_csv)
display(df)

Scala

val df = df1.union(df_csv_renamed)
display(df)

R

display(df <- union(df1, df_csv))

Filtrowanie wierszy w ramce danych

Odkryj najpopularniejsze nazwy dzieci w zestawie danych, filtrując wiersze przy użyciu platformy Apache Spark .filter() lub .where() metod. Użyj filtrowania, aby wybrać podzbiór wierszy, które mają być zwracane lub modyfikowane w ramce danych. Nie ma różnicy w wydajności lub składni, jak pokazano w poniższych przykładach.

Korzystanie z metody .filter()

Skopiuj i wklej następujący kod do pustej komórki notesu. Ten kod używa metody Apache Spark .filter() do wyświetlania tych wierszy w ramce danych z liczbą większą niż 50.

Python
display(df.filter(df["Count"] > 50))
Scala
display(df.filter(df("Count") > 50))
R
display(filteredDF <- filter(df, df$Count > 50))

Używanie metody .where()

Skopiuj i wklej następujący kod do pustej komórki notesu. Ten kod używa metody Apache Spark .where() do wyświetlania tych wierszy w ramce danych z liczbą większą niż 50.

Python
display(df.where(df["Count"] > 50))
Scala
display(df.where(df("Count") > 50))
R
display(filtered_df <- where(df, df$Count > 50))

Wybieranie kolumn z ramki danych i kolejność według częstotliwości

Dowiedz się, która częstotliwość nazw dziecka z select() metodą określa kolumny z ramki danych do zwrócenia. Użyj platformy Apache Spark orderby i desc funkcji, aby uporządkować wyniki.

Moduł pyspark.sql dla platformy Apache Spark zapewnia obsługę funkcji SQL. Wśród tych funkcji używanych w tym samouczku są funkcje apache Spark orderBy(), desc()i expr() . Korzystanie z tych funkcji można włączyć, importując je do sesji zgodnie z potrzebami.

Skopiuj i wklej następujący kod do pustej komórki notesu. Ten kod importuje desc() funkcję, a następnie używa metody Apache Spark i platformy Apache Spark select()orderBy() oraz desc() funkcji do wyświetlania najbardziej typowych nazw i ich liczby w kolejności malejącej.

Python

from pyspark.sql.functions import desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))

Scala

import org.apache.spark.sql.functions.desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))

R

display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))

Tworzenie podzestawu ramki danych

Dowiedz się, jak utworzyć podzestaw ramki danych na podstawie istniejącej ramki danych.

Skopiuj i wklej następujący kod do pustej komórki notesu. Ten kod używa metody Apache Spark filter , aby utworzyć nową ramkę danych ograniczającą dane według roku, liczby i płci. Używa metody Apache Spark select() , aby ograniczyć kolumny. Używa również platformy Apache Spark orderBy() i desc() funkcji do sortowania nowej ramki danych według liczby.

Python

subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
display(subsetDF)

Scala

val subsetDF = df.filter((df("Year") == 2009) && (df("Count") > 100) && (df("Sex") == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))

display(subsetDF)

R

subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count")
display(subsetDF)

Krok 5. Zapisywanie ramki danych

Dowiedz się, jak zapisać ramkę danych. Możesz zapisać ramkę danych w tabeli lub zapisać ramkę danych w pliku lub wielu plikach.

Zapisywanie ramki danych w tabeli

Usługa Azure Databricks domyślnie używa formatu usługi Delta Lake dla wszystkich tabel. Aby zapisać ramkę danych, musisz mieć CREATE uprawnienia tabeli w katalogu i schemacie.

Skopiuj i wklej następujący kod do pustej komórki notesu. Ten kod zapisuje zawartość ramki danych w tabeli przy użyciu zmiennej zdefiniowanej na początku tego samouczka.

Python

df.write.saveAsTable(f"{path_tables}" + "." + f"{table_name}")

# To overwrite an existing table, use the following code:
# df.write.mode("overwrite").saveAsTable(f"{path_tables}" + "." + f"{table_name}")

Scala

df.write.saveAsTable(s"$path_tables" + "." + s"$table_name")

// To overwrite an existing table, use the following code:
// df.write.mode("overwrite").saveAsTable(s"$path_volume" + "." + s"$table_name")

R

saveAsTable(df, paste(path_tables, ".", table_name))
# To overwrite an existing table, use the following code:
# saveAsTable(df, paste(path_tables, ".", table_name), mode = "overwrite")

Większość aplikacji platformy Apache Spark działa na dużych zestawach danych i w sposób rozproszony. Platforma Apache Spark zapisuje katalog plików, a nie jeden plik. Usługa Delta Lake dzieli foldery Parquet i pliki. Wiele systemów danych może odczytywać te katalogi plików. Usługa Azure Databricks zaleca używanie tabel przez ścieżki plików dla większości aplikacji.

Zapisywanie ramki danych w plikach JSON

Skopiuj i wklej następujący kod do pustej komórki notesu. Ten kod zapisuje ramkę danych w katalogu plików JSON.

Python

df.write.format("json").save("/tmp/json_data")

# To overwrite an existing file, use the following code:
# df.write.format("json").mode("overwrite").save("/tmp/json_data")

Scala

df.write.format("json").save("/tmp/json_data")

// To overwrite an existing file, use the following code:
// df.write.format("json").mode("overwrite").save("/tmp/json_data")

R

write.df(df, path = "/tmp/json_data", source = "json")
# To overwrite an existing file, use the following code:
# write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")

Odczytywanie ramki danych z pliku JSON

Dowiedz się, jak używać metody Apache Spark spark.read.format() do odczytywania danych JSON z katalogu do ramki danych.

Skopiuj i wklej następujący kod do pustej komórki notesu. Ten kod wyświetla pliki JSON zapisane w poprzednim przykładzie.

Python

display(spark.read.format("json").json("/tmp/json_data"))

Scala

display(spark.read.format("json").json("/tmp/json_data"))

R

display(read.json("/tmp/json_data"))

Dodatkowe zadania: Uruchamianie zapytań SQL w PySpark, Scala i R

Ramki danych platformy Apache Spark udostępniają następujące opcje łączenia języka SQL z narzędziami PySpark, Scala i R. Poniższy kod można uruchomić w tym samym notesie, który został utworzony na potrzeby tego samouczka.

Określanie kolumny jako zapytania SQL

Dowiedz się, jak używać metody Apache Spark selectExpr() . Jest to wariant select() metody, która akceptuje wyrażenia SQL i zwraca zaktualizowaną ramkę danych. Ta metoda umożliwia użycie wyrażenia SQL, takiego jak upper.

Skopiuj i wklej następujący kod do pustej komórki notesu. Ten kod używa metody Apache Spark selectExpr() i wyrażenia SQL upper , aby przekonwertować kolumnę ciągu na wielkie litery (i zmienić nazwę kolumny).

Python

display(df.selectExpr("Count", "upper(County) as big_name"))

Scala

display(df.selectExpr("Count", "upper(County) as big_name"))

R

display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))

Użyj expr() polecenia , aby użyć składni SQL dla kolumny

Dowiedz się, jak importować i używać funkcji Apache Spark expr() do używania składni SQL w dowolnym miejscu, w jakim zostanie określona kolumna.

Skopiuj i wklej następujący kod do pustej komórki notesu. Ten kod importuje expr() funkcję, a następnie używa funkcji Apache Spark expr() i wyrażenia SQL lower , aby przekonwertować kolumnę ciągu na małe litery (i zmienić nazwę kolumny).

Python

from pyspark.sql.functions import expr
display(df.select("Count", expr("lower(County) as little_name")))

Scala

import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function

display(df.select(col("Count"), expr("lower(County) as little_name")))

R

display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name"))
# expr() function is not supported in R, selectExpr in SparkR replicates this functionality

Uruchamianie dowolnego zapytania SQL przy użyciu funkcji spark.sql()

Dowiedz się, jak używać funkcji Apache Spark spark.sql() do uruchamiania dowolnych zapytań SQL.

Skopiuj i wklej następujący kod do pustej komórki notesu. Ten kod używa funkcji Apache Spark spark.sql() do wykonywania zapytań dotyczących tabeli SQL przy użyciu składni SQL.

Python

display(spark.sql(f"SELECT * FROM {path_tables}" + "." + f"{table_name}"))

Scala

display(spark.sql(s"SELECT * FROM $path_tables.$table_name"))

R

display(sql(paste("SELECT * FROM", path_tables, ".", table_name)))

Notes samouczka dotyczący ramki danych

Poniższy notes zawiera przykłady zapytań z tego samouczka.

Python

Notes samouczka ramki danych

Pobierz notes

Scala

Notes samouczka ramki danych

Pobierz notes

R

Notes samouczka ramki danych

Pobierz notes

Dodatkowe zasoby