Udostępnij za pośrednictwem


Samouczek: ładowanie i przekształcanie danych przy użyciu ramek danych platformy Apache Spark

W tym samouczku pokazano, jak ładować i przekształcać dane przy użyciu interfejsu API ramki danych platformy Apache Spark (PySpark), interfejsu API ramki danych Platformy Apache Spark Scala oraz interfejsu API SparkDataFrame sparkR w usłudze Azure Databricks.

Po ukończeniu tego samouczka dowiesz się, czym jest ramka danych i zapoznasz się z następującymi zadaniami:

Python

Zobacz też dokumentację interfejsu API platformy Apache Spark PySpark.

Scala

Zobacz też Dokumentację interfejsu API języka Scala platformy Apache Spark.

R

Zobacz też dokumentację interfejsu API platformy Apache SparkR.

Co to jest ramka danych?

Ramka danych to dwuwymiarowa struktura danych z kolumnami potencjalnie różnych typów. Ramka danych może być podobna do arkusza kalkulacyjnego, tabeli SQL lub słownika obiektów serii. Ramki danych platformy Apache Spark zapewniają bogaty zestaw funkcji (wybieranie kolumn, filtrowanie, sprzężenie, agregowanie), które umożliwiają efektywne rozwiązywanie typowych problemów z analizą danych.

Ramki danych platformy Apache Spark to abstrakcja oparta na odpornych rozproszonych zestawach danych (RDD). Ramki danych platformy Spark i platforma Spark SQL używają ujednoliconego aparatu planowania i optymalizacji, co pozwala uzyskać niemal identyczną wydajność we wszystkich obsługiwanych językach w usłudze Azure Databricks (Python, SQL, Scala i R).

Wymagania

Aby wykonać czynności opisane w poniższym samouczku, musisz spełnić następujące wymagania:

  • Aby użyć przykładów w tym samouczku, obszar roboczy musi mieć włączony wykaz aparatu Unity.

  • Przykłady w tym samouczku używają woluminu wykazu aparatu Unity do przechowywania przykładowych danych. Aby użyć tych przykładów, utwórz wolumin i użyj wykazu, schematu i nazw woluminów, aby ustawić ścieżkę woluminu używaną przez przykłady.

  • Musisz mieć następujące uprawnienia w katalogu aparatu Unity:

    • READ VOLUME i WRITE VOLUME, lub ALL PRIVILEGES dla woluminu używanego na potrzeby tego samouczka.
    • USE SCHEMA lub ALL PRIVILEGES schemat używany na potrzeby tego samouczka.
    • USE CATALOG lub ALL PRIVILEGES katalogu używanego na potrzeby tego samouczka.

    Aby ustawić te uprawnienia, zobacz uprawnienia administratora usługi Databricks lub katalogu aparatu Unity oraz zabezpieczane obiekty.

Napiwek

Aby zapoznać się z ukończonym notesem dla tego artykułu, zobacz Notesy samouczka dotyczące ramki danych.

Krok 1. Definiowanie zmiennych i ładowanie pliku CSV

Ten krok definiuje zmienne do użycia w tym samouczku, a następnie ładuje plik CSV zawierający dane nazwy dziecka z health.data.ny.gov do woluminu wykazu aparatu Unity.

  1. Otwórz nowy notes, klikając ikonę Nowa ikona . Aby dowiedzieć się, jak nawigować po notesach usługi Azure Databricks, zobacz Interfejs i kontrolki notesu usługi Databricks.

  2. Skopiuj i wklej następujący kod do nowej pustej komórki notesu. Zastąp <catalog-name>wartości , <schema-name>i <volume-name> nazwami wykazu, schematu i woluminu dla woluminu wykazu aparatu Unity. Zastąp <table_name> wybraną nazwą tabeli. Dane nazwy dziecka zostaną załadowane do tej tabeli w dalszej części tego samouczka.

    Python

    catalog = "<catalog_name>"
    schema = "<schema_name>"
    volume = "<volume_name>"
    download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name = "rows.csv"
    table_name = "<table_name>"
    path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
    path_table = catalog + "." + schema
    print(path_table) # Show the complete path
    print(path_volume) # Show the complete path
    

    Scala

    val catalog = "<catalog_name>"
    val schema = "<schema_name>"
    val volume = "<volume_name>"
    val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    val fileName = "rows.csv"
    val tableName = "<table_name>"
    val pathVolume = s"/Volumes/$catalog/$schema/$volume"
    val pathTable = s"$catalog.$schema"
    print(pathVolume) // Show the complete path
    print(pathTable) // Show the complete path
    

    R

    catalog <- "<catalog_name>"
    schema <- "<schema_name>"
    volume <- "<volume_name>"
    download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name <- "rows.csv"
    table_name <- "<table_name>"
    path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "")
    path_table <- paste(catalog, ".", schema, sep = "")
    print(path_volume) # Show the complete path
    print(path_table) # Show the complete path
    
  3. Naciśnij Shift+Enter , aby uruchomić komórkę i utworzyć nową pustą komórkę.

  4. Skopiuj i wklej następujący kod do nowej pustej komórki notesu. Ten kod kopiuje rows.csv plik z health.data.ny.gov do woluminu wykazu aparatu Unity przy użyciu polecenia dbutuils usługi Databricks .

    Python

    dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")
    

    Scala

    dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")
    

    R

    dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
    
  5. Naciśnij , Shift+Enter aby uruchomić komórkę, a następnie przejdź do następnej komórki.

Krok 2. Tworzenie ramki danych

Ten krok tworzy ramkę danych o nazwie z df1 danymi testowymi, a następnie wyświetla jej zawartość.

  1. Skopiuj i wklej następujący kod do nowej pustej komórki notesu. Ten kod tworzy ramkę danych z danymi testowymi, a następnie wyświetla zawartość i schemat ramki danych.

    Python

    data = [[2021, "test", "Albany", "M", 42]]
    columns = ["Year", "First_Name", "County", "Sex", "Count"]
    
    df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    Scala

    val data = Seq((2021, "test", "Albany", "M", 42))
    val columns = Seq("Year", "First_Name", "County", "Sex", "Count")
    
    val df1 = data.toDF(columns: _*)
    display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization.
    // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    R

    # Load the SparkR package that is already preinstalled on the cluster.
    library(SparkR)
    
    data <- data.frame(
      Year = as.integer(c(2021)),
      First_Name = c("test"),
      County = c("Albany"),
      Sex = c("M"),
      Count = as.integer(c(42))
    )
    
    df1 <- createDataFrame(data)
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
    
  2. Naciśnij , Shift+Enter aby uruchomić komórkę, a następnie przejdź do następnej komórki.

Krok 3. Ładowanie danych do ramki danych z pliku CSV

W tym kroku zostanie utworzona ramka danych o nazwie df_csv z pliku CSV, który został wcześniej załadowany do woluminu wykazu aparatu Unity. Zobacz spark.read.csv.

  1. Skopiuj i wklej następujący kod do nowej pustej komórki notesu. Ten kod ładuje dane baby name do ramki danych z pliku CSV, a następnie wyświetla zawartość ramki df_csv danych.

    Python

    df_csv = spark.read.csv(f"{path_volume}/{file_name}",
        header=True,
        inferSchema=True,
        sep=",")
    display(df_csv)
    

    Scala

    val dfCsv = spark.read
        .option("header", "true")
        .option("inferSchema", "true")
        .option("delimiter", ",")
        .csv(s"$pathVolume/$fileName")
    
    display(dfCsv)
    

    R

    df_csv <- read.df(paste(path_volume, "/", file_name, sep=""),
        source="csv",
        header = TRUE,
        inferSchema = TRUE,
        delimiter = ",")
    
    display(df_csv)
    
  2. Naciśnij , Shift+Enter aby uruchomić komórkę, a następnie przejdź do następnej komórki.

Dane można załadować z wielu obsługiwanych formatów plików.

Krok 4. Wyświetlanie ramki danych i interakcja z nią

Wyświetlaj ramki danych dla dzieci i wchodzić z nimi w interakcje, korzystając z poniższych metod.

Dowiedz się, jak wyświetlić schemat ramki danych platformy Apache Spark. Platforma Apache Spark używa terminu schema do odwoływania się do nazw i typów danych kolumn w ramce danych.

Uwaga

Usługa Azure Databricks używa również terminu schema do opisania kolekcji tabel zarejestrowanych w wykazie.

  1. Skopiuj i wklej następujący kod do pustej komórki notesu. Ten kod przedstawia schemat ramek danych z .printSchema() metodą wyświetlania schematów dwóch ramek danych — w celu przygotowania do połączenia dwóch ramek danych.

    Python

    df_csv.printSchema()
    df1.printSchema()
    

    Scala

    dfCsv.printSchema()
    df1.printSchema()
    

    R

    printSchema(df_csv)
    printSchema(df1)
    
  2. Naciśnij , Shift+Enter aby uruchomić komórkę, a następnie przejdź do następnej komórki.

Zmienianie nazwy kolumny w ramce danych

Dowiedz się, jak zmienić nazwę kolumny w ramce danych.

  1. Skopiuj i wklej następujący kod do pustej komórki notesu. Ten kod zmienia nazwę kolumny w ramce df1_csv danych, aby dopasować je do odpowiedniej kolumny w ramce df1 danych. Ten kod używa metody Apache Spark withColumnRenamed() .

    Python

    df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
    df_csv.printSchema
    

    Scala

    val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name")
    // when modifying a DataFrame in Scala, you must assign it to a new variable
    dfCsvRenamed.printSchema()
    

    R

    df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name")
    printSchema(df_csv)
    
  2. Naciśnij , Shift+Enter aby uruchomić komórkę, a następnie przejdź do następnej komórki.

Łączenie ramek danych

Dowiedz się, jak utworzyć nową ramkę danych, która dodaje wiersze jednej ramki danych do innej.

  1. Skopiuj i wklej następujący kod do pustej komórki notesu. Ten kod używa metody Apache Spark union() , aby połączyć zawartość pierwszej ramki df danych z ramą danych df_csv zawierającą dane nazw dzieci załadowanych z pliku CSV.

    Python

    df = df1.union(df_csv)
    display(df)
    

    Scala

    val df = df1.union(dfCsvRenamed)
    display(df)
    

    R

    display(df <- union(df1, df_csv))
    
  2. Naciśnij , Shift+Enter aby uruchomić komórkę, a następnie przejdź do następnej komórki.

Filtrowanie wierszy w ramce danych

Odkryj najpopularniejsze nazwy dzieci w zestawie danych, filtrując wiersze przy użyciu platformy Apache Spark .filter() lub .where() metod. Użyj filtrowania, aby wybrać podzbiór wierszy, które mają być zwracane lub modyfikowane w ramce danych. Nie ma różnicy w wydajności lub składni, jak pokazano w poniższych przykładach.

Korzystanie z metody .filter()

  1. Skopiuj i wklej następujący kod do pustej komórki notesu. Ten kod używa metody Apache Spark .filter() do wyświetlania tych wierszy w ramce danych z liczbą większą niż 50.

    Python
    display(df.filter(df["Count"] > 50))
    
    Scala
    display(df.filter(df("Count") > 50))
    
    R
    display(filteredDF <- filter(df, df$Count > 50))
    
  2. Naciśnij , Shift+Enter aby uruchomić komórkę, a następnie przejdź do następnej komórki.

Używanie metody .where()

  1. Skopiuj i wklej następujący kod do pustej komórki notesu. Ten kod używa metody Apache Spark .where() do wyświetlania tych wierszy w ramce danych z liczbą większą niż 50.

    Python
    display(df.where(df["Count"] > 50))
    
    Scala
    display(df.where(df("Count") > 50))
    
    R
    display(filtered_df <- where(df, df$Count > 50))
    
  2. Naciśnij , Shift+Enter aby uruchomić komórkę, a następnie przejdź do następnej komórki.

Wybieranie kolumn z ramki danych i kolejność według częstotliwości

Dowiedz się, która częstotliwość nazw dziecka z select() metodą określa kolumny z ramki danych do zwrócenia. Użyj platformy Apache Spark orderby i desc funkcji, aby uporządkować wyniki.

Moduł pyspark.sql dla platformy Apache Spark zapewnia obsługę funkcji SQL. Wśród tych funkcji używanych w tym samouczku są funkcje apache Spark orderBy(), desc()i expr() . Korzystanie z tych funkcji można włączyć, importując je do sesji zgodnie z potrzebami.

  1. Skopiuj i wklej następujący kod do pustej komórki notesu. Ten kod importuje desc() funkcję, a następnie używa metody Apache Spark i platformy Apache Spark select() orderBy() oraz desc() funkcji do wyświetlania najbardziej typowych nazw i ich liczby w kolejności malejącej.

    Python

    from pyspark.sql.functions import desc
    display(df.select("First_Name", "Count").orderBy(desc("Count")))
    

    Scala

    import org.apache.spark.sql.functions.desc
    display(df.select("First_Name", "Count").orderBy(desc("Count")))
    

    R

    display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
    
  2. Naciśnij , Shift+Enter aby uruchomić komórkę, a następnie przejdź do następnej komórki.

Tworzenie podzestawu ramki danych

Dowiedz się, jak utworzyć podzestaw ramki danych na podstawie istniejącej ramki danych.

  1. Skopiuj i wklej następujący kod do pustej komórki notesu. Ten kod używa metody Apache Spark filter , aby utworzyć nową ramkę danych ograniczającą dane według roku, liczby i płci. Używa metody Apache Spark select() , aby ograniczyć kolumny. Używa również platformy Apache Spark orderBy() i desc() funkcji do sortowania nowej ramki danych według liczby.

    Python

    subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
    display(subsetDF)
    

    Scala

    val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
    
    display(subsetDF)
    

    R

    subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count")
    display(subsetDF)
    
  2. Naciśnij , Shift+Enter aby uruchomić komórkę, a następnie przejdź do następnej komórki.

Krok 5. Zapisywanie ramki danych

Dowiedz się, jak zapisać ramkę danych. Możesz zapisać ramkę danych w tabeli lub zapisać ramkę danych w pliku lub wielu plikach.

Zapisywanie ramki danych w tabeli

Usługa Azure Databricks domyślnie używa formatu usługi Delta Lake dla wszystkich tabel. Aby zapisać ramkę danych, musisz mieć CREATE uprawnienia tabeli w katalogu i schemacie.

  1. Skopiuj i wklej następujący kod do pustej komórki notesu. Ten kod zapisuje zawartość ramki danych w tabeli przy użyciu zmiennej zdefiniowanej na początku tego samouczka.

    Python

    df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")
    

    Scala

    df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")
    

    R

    saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")
    
  2. Naciśnij , Shift+Enter aby uruchomić komórkę, a następnie przejdź do następnej komórki.

Większość aplikacji platformy Apache Spark działa na dużych zestawach danych i w sposób rozproszony. Platforma Apache Spark zapisuje katalog plików, a nie jeden plik. Usługa Delta Lake dzieli foldery Parquet i pliki. Wiele systemów danych może odczytywać te katalogi plików. Usługa Azure Databricks zaleca używanie tabel przez ścieżki plików dla większości aplikacji.

Zapisywanie ramki danych w plikach JSON

  1. Skopiuj i wklej następujący kod do pustej komórki notesu. Ten kod zapisuje ramkę danych w katalogu plików JSON.

    Python

    df.write.format("json").mode("overwrite").save("/tmp/json_data")
    

    Scala

    df.write.format("json").mode("overwrite").save("/tmp/json_data")
    

    R

    write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
    
  2. Naciśnij , Shift+Enter aby uruchomić komórkę, a następnie przejdź do następnej komórki.

Odczytywanie ramki danych z pliku JSON

Dowiedz się, jak używać metody Apache Spark spark.read.format() do odczytywania danych JSON z katalogu do ramki danych.

  1. Skopiuj i wklej następujący kod do pustej komórki notesu. Ten kod wyświetla pliki JSON zapisane w poprzednim przykładzie.

    Python

    display(spark.read.format("json").json("/tmp/json_data"))
    

    Scala

    display(spark.read.format("json").json("/tmp/json_data"))
    

    R

    display(read.json("/tmp/json_data"))
    
  2. Naciśnij , Shift+Enter aby uruchomić komórkę, a następnie przejdź do następnej komórki.

Dodatkowe zadania: Uruchamianie zapytań SQL w PySpark, Scala i R

Ramki danych platformy Apache Spark udostępniają następujące opcje łączenia języka SQL z narzędziami PySpark, Scala i R. Poniższy kod można uruchomić w tym samym notesie, który został utworzony na potrzeby tego samouczka.

Określanie kolumny jako zapytania SQL

Dowiedz się, jak używać metody Apache Spark selectExpr() . Jest to wariant select() metody, która akceptuje wyrażenia SQL i zwraca zaktualizowaną ramkę danych. Ta metoda umożliwia użycie wyrażenia SQL, takiego jak upper.

  1. Skopiuj i wklej następujący kod do pustej komórki notesu. Ten kod używa metody Apache Spark selectExpr() i wyrażenia SQL upper , aby przekonwertować kolumnę ciągu na wielkie litery (i zmienić nazwę kolumny).

    Python

    display(df.selectExpr("Count", "upper(County) as big_name"))
    

    Scala

    display(df.selectExpr("Count", "upper(County) as big_name"))
    

    R

    display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
    
  2. Naciśnij , Shift+Enter aby uruchomić komórkę, a następnie przejdź do następnej komórki.

Użyj expr() polecenia , aby użyć składni SQL dla kolumny

Dowiedz się, jak importować i używać funkcji Apache Spark expr() do używania składni SQL w dowolnym miejscu, w jakim zostanie określona kolumna.

  1. Skopiuj i wklej następujący kod do pustej komórki notesu. Ten kod importuje expr() funkcję, a następnie używa funkcji Apache Spark expr() i wyrażenia SQL lower , aby przekonwertować kolumnę ciągu na małe litery (i zmienić nazwę kolumny).

    Python

    from pyspark.sql.functions import expr
    display(df.select("Count", expr("lower(County) as little_name")))
    

    Scala

    import org.apache.spark.sql.functions.{col, expr}
    // Scala requires us to import the col() function as well as the expr() function
    
    display(df.select(col("Count"), expr("lower(County) as little_name")))
    

    R

    display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name"))
    # expr() function is not supported in R, selectExpr in SparkR replicates this functionality
    
  2. Naciśnij , Shift+Enter aby uruchomić komórkę, a następnie przejdź do następnej komórki.

Uruchamianie dowolnego zapytania SQL przy użyciu funkcji spark.sql()

Dowiedz się, jak używać funkcji Apache Spark spark.sql() do uruchamiania dowolnych zapytań SQL.

  1. Skopiuj i wklej następujący kod do pustej komórki notesu. Ten kod używa funkcji Apache Spark spark.sql() do wykonywania zapytań dotyczących tabeli SQL przy użyciu składni SQL.

    Python

    display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))
    

    Scala

    display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))
    

    R

    display(sql(paste("SELECT * FROM", path_table, ".", table_name)))
    
  2. Naciśnij , Shift+Enter aby uruchomić komórkę, a następnie przejdź do następnej komórki.

Notesy samouczka dotyczące ramki danych

Poniższe notesy zawierają przykłady zapytań z tego samouczka.

Python

Samouczek dotyczący ramek danych przy użyciu języka Python

Pobierz notes

Scala

Samouczek dotyczący ramek danych przy użyciu języka Scala

Pobierz notes

R

Samouczek dotyczący ramek danych przy użyciu języka R

Pobierz notes

Dodatkowe zasoby