Kurz: Načtení a transformace dat pomocí datových rámců Apache Sparku

V tomto kurzu se dozvíte, jak načíst a transformovat data pomocí rozhraní API datového rámce Apache Spark Python (PySpark), rozhraní API datového rámce Apache Spark Scala a rozhraní API SparkR SparkDataFrame v Azure Databricks.

Na konci tohoto kurzu pochopíte, co datový rámec je, a seznámíte se s následujícími úlohami:

Python

Viz také referenční informace k rozhraní API Apache Spark PySpark.

Scala

Viz také referenční informace k rozhraní Apache Spark Scala API.

R

Viz také referenční informace k rozhraní Apache SparkR API.

Co je datový rámec?

Datový rámec je dvourozměrná datová struktura s sloupci potenciálně různých typů. Datový rámec si můžete představit jako tabulku, tabulku SQL nebo slovník objektů řady. Datové rámce Apache Spark poskytují bohatou sadu funkcí (výběr sloupců, filtrování, spojení, agregace), které umožňují efektivně řešit běžné problémy s analýzou dat.

Datové rámce Apache Sparku jsou abstrakce založená na odolných distribuovaných datových sadách (RDD). Datové rámce Sparku a Spark SQL používají jednotný modul pro plánování a optimalizaci, který umožňuje získat téměř stejný výkon ve všech podporovaných jazycích v Azure Databricks (Python, SQL, Scala a R).

Požadavky

K dokončení následujícího kurzu musíte splnit následující požadavky:

  • Pokud chcete použít příklady v tomto kurzu, musí mít váš pracovní prostor povolený katalog Unity.

  • Příklady v tomto kurzu používají k ukládání ukázkových dat svazek katalogu Unity. Pokud chcete tyto příklady použít, vytvořte svazek a použijte katalog, schéma a názvy svazků k nastavení cesty svazku používané příklady.

  • V katalogu Unity musíte mít následující oprávnění:

    • READ VOLUME a WRITE VOLUME) nebo ALL PRIVILEGES pro svazek použitý pro tento kurz.
    • USE SCHEMA nebo ALL PRIVILEGES pro schéma použité pro tento kurz.
    • USE CATALOG nebo ALL PRIVILEGES pro katalog použitý pro tento kurz.

    Pokud chcete tato oprávnění nastavit, podívejte se na správce Databricks nebo na oprávnění katalogu Unity a zabezpečitelné objekty.

Tip

Dokončený poznámkový blok pro tento článek najdete v poznámkových blocích kurzu datového rámce.

Krok 1: Definování proměnných a načtení souboru CSV

Tento krok definuje proměnné pro použití v tomto kurzu a pak načte soubor CSV obsahující data názvu dítěte z health.data.ny.gov do svazku katalogu Unity.

  1. Kliknutím Nová ikona na ikonu otevřete nový poznámkový blok. Informace o procházení poznámkových bloků Azure Databricks najdete v tématu Rozhraní a ovládací prvky poznámkového bloku Databricks.

  2. Zkopírujte a vložte následující kód do nové prázdné buňky poznámkového bloku. Nahraďte <catalog-name>katalog, <volume-name><schema-name>schéma a názvy svazků pro svazek katalogu Unity. Nahraďte <table_name> zvoleným názvem tabulky. Data názvu dítěte načtete do této tabulky později v tomto kurzu.

    Python

    catalog = "<catalog_name>"
    schema = "<schema_name>"
    volume = "<volume_name>"
    download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name = "rows.csv"
    table_name = "<table_name>"
    path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
    path_table = catalog + "." + schema
    print(path_table) # Show the complete path
    print(path_volume) # Show the complete path
    

    Scala

    val catalog = "<catalog_name>"
    val schema = "<schema_name>"
    val volume = "<volume_name>"
    val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    val fileName = "rows.csv"
    val tableName = "<table_name>"
    val pathVolume = s"/Volumes/$catalog/$schema/$volume"
    val pathTable = s"$catalog.$schema"
    print(pathVolume) // Show the complete path
    print(pathTable) // Show the complete path
    

    R

    catalog <- "<catalog_name>"
    schema <- "<schema_name>"
    volume <- "<volume_name>"
    download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name <- "rows.csv"
    table_name <- "<table_name>"
    path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "")
    path_table <- paste(catalog, ".", schema, sep = "")
    print(path_volume) # Show the complete path
    print(path_table) # Show the complete path
    
  3. Stisknutím spustíte Shift+Enter buňku a vytvoříte novou prázdnou buňku.

  4. Zkopírujte a vložte následující kód do nové prázdné buňky poznámkového bloku. Tento kód zkopíruje rows.csv soubor z health.data.ny.gov do svazku katalogu Unity pomocí příkazu Databricks dbutuils .

    Python

    dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")
    

    Scala

    dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")
    

    R

    dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
    
  5. Stisknutím klávesy Shift+Enter spusťte buňku a přejděte na další buňku.

Krok 2: Vytvoření datového rámce

Tento krok vytvoří datový rámec s testovacími df1 daty a pak zobrazí jeho obsah.

  1. Zkopírujte a vložte následující kód do nové prázdné buňky poznámkového bloku. Tento kód vytvoří datový rámec s testovacími daty a pak zobrazí obsah a schéma datového rámce.

    Python

    data = [[2021, "test", "Albany", "M", 42]]
    columns = ["Year", "First_Name", "County", "Sex", "Count"]
    
    df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    Scala

    val data = Seq((2021, "test", "Albany", "M", 42))
    val columns = Seq("Year", "First_Name", "County", "Sex", "Count")
    
    val df1 = data.toDF(columns: _*)
    display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization.
    // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    R

    # Load the SparkR package that is already preinstalled on the cluster.
    library(SparkR)
    
    data <- data.frame(
      Year = as.integer(c(2021)),
      First_Name = c("test"),
      County = c("Albany"),
      Sex = c("M"),
      Count = as.integer(c(42))
    )
    
    df1 <- createDataFrame(data)
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
    
  2. Stisknutím klávesy Shift+Enter spusťte buňku a přejděte na další buňku.

Krok 3: Načtení dat do datového rámce ze souboru CSV

Tento krok vytvoří datový rámec pojmenovaný df_csv ze souboru CSV, který jste předtím načetli do svazku katalogu Unity. Viz spark.read.csv.

  1. Zkopírujte a vložte následující kód do nové prázdné buňky poznámkového bloku. Tento kód načte data názvu dítěte do datového rámce df_csv ze souboru CSV a pak zobrazí obsah datového rámce.

    Python

    df_csv = spark.read.csv(f"{path_volume}/{file_name}",
        header=True,
        inferSchema=True,
        sep=",")
    display(df_csv)
    

    Scala

    val dfCsv = spark.read
        .option("header", "true")
        .option("inferSchema", "true")
        .option("delimiter", ",")
        .csv(s"$pathVolume/$fileName")
    
    display(dfCsv)
    

    R

    df_csv <- read.df(paste(path_volume, "/", file_name, sep=""),
        source="csv",
        header = TRUE,
        inferSchema = TRUE,
        delimiter = ",")
    
    display(df_csv)
    
  2. Stisknutím klávesy Shift+Enter spusťte buňku a přejděte na další buňku.

Můžete načíst data z mnoha podporovaných formátů souborů.

Krok 4: Zobrazení datového rámce a interakce s ním

Pomocí následujících metod můžete zobrazit datové rámce s názvy dětí a pracovat s nimi.

Zjistěte, jak zobrazit schéma datového rámce Apache Spark. Apache Spark používá schéma termínů k odkazování na názvy a datové typy sloupců v datovém rámci.

Poznámka:

Azure Databricks také používá schéma termínů k popisu kolekce tabulek zaregistrovaných v katalogu.

  1. Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód ukazuje schéma datových rámců s metodou .printSchema() pro zobrazení schémat dvou datových rámců – pro přípravu na sjednocení těchto dvou datových rámců.

    Python

    df_csv.printSchema()
    df1.printSchema()
    

    Scala

    dfCsv.printSchema()
    df1.printSchema()
    

    R

    printSchema(df_csv)
    printSchema(df1)
    
  2. Stisknutím klávesy Shift+Enter spusťte buňku a přejděte na další buňku.

Přejmenování sloupce v datovém rámci

Zjistěte, jak přejmenovat sloupec v datovém rámci.

  1. Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód přejmenuje sloupec v datovém df1_csv rámci tak, aby odpovídal příslušnému sloupci v datovém rámci df1 . Tento kód používá metodu Apache Spark withColumnRenamed() .

    Python

    df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
    df_csv.printSchema
    

    Scala

    val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name")
    // when modifying a DataFrame in Scala, you must assign it to a new variable
    dfCsvRenamed.printSchema()
    

    R

    df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name")
    printSchema(df_csv)
    
  2. Stisknutím klávesy Shift+Enter spusťte buňku a přejděte na další buňku.

Kombinování datových rámců

Zjistěte, jak vytvořit nový datový rámec, který přidá řádky jednoho datového rámce do druhého.

  1. Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód používá metodu Apache Spark union() ke kombinování obsahu prvního datového rámce df s datovým rámcem df_csv obsahujícím data názvů dětí načtená ze souboru CSV.

    Python

    df = df1.union(df_csv)
    display(df)
    

    Scala

    val df = df1.union(dfCsvRenamed)
    display(df)
    

    R

    display(df <- union(df1, df_csv))
    
  2. Stisknutím klávesy Shift+Enter spusťte buňku a přejděte na další buňku.

Filtrování řádků v datovém rámci

Seznamte se s nejoblíbenějšími názvy dětí v sadě dat filtrováním řádků pomocí Apache Sparku .filter() nebo .where() metod. Pomocí filtrování vyberte podmnožinu řádků, které chcete vrátit nebo upravit v datovém rámci. V výkonu nebo syntaxi není žádný rozdíl, jak je vidět v následujících příkladech.

Použití metody .filter()

  1. Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód používá metodu Apache Spark .filter() k zobrazení těchto řádků v datovém rámci s počtem více než 50.

    Python
    display(df.filter(df["Count"] > 50))
    
    Scala
    display(df.filter(df("Count") > 50))
    
    R
    display(filteredDF <- filter(df, df$Count > 50))
    
  2. Stisknutím klávesy Shift+Enter spusťte buňku a přejděte na další buňku.

Použití metody .where()

  1. Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód používá metodu Apache Spark .where() k zobrazení těchto řádků v datovém rámci s počtem více než 50.

    Python
    display(df.where(df["Count"] > 50))
    
    Scala
    display(df.where(df("Count") > 50))
    
    R
    display(filtered_df <- where(df, df$Count > 50))
    
  2. Stisknutím klávesy Shift+Enter spusťte buňku a přejděte na další buňku.

Výběr sloupců z datového rámce a pořadí podle frekvence

Přečtěte si, jakou frekvenci select() názvu dítěte metoda určuje sloupce z datového rámce, které se mají vrátit. K seřazení výsledků použijte Apache Spark orderby a desc funkce.

Modul pyspark.sql pro Apache Spark poskytuje podporu funkcí SQL. Mezi tyto funkce, které používáme v tomto kurzu, patří Apache Spark orderBy(), desc()a expr() funkce. Použití těchto funkcí povolíte tak, že je podle potřeby naimportujete do relace.

  1. Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód naimportuje desc() funkci a pak použije metodu Apache Spark select() a Apache Spark orderBy() a desc() funkce k zobrazení nejběžnějších názvů a jejich počtu v sestupném pořadí.

    Python

    from pyspark.sql.functions import desc
    display(df.select("First_Name", "Count").orderBy(desc("Count")))
    

    Scala

    import org.apache.spark.sql.functions.desc
    display(df.select("First_Name", "Count").orderBy(desc("Count")))
    

    R

    display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
    
  2. Stisknutím klávesy Shift+Enter spusťte buňku a přejděte na další buňku.

Vytvoření datového rámce podmnožina

Zjistěte, jak vytvořit podmnožinu datového rámce z existujícího datového rámce.

  1. Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód používá metodu Apache Spark filter k vytvoření nového datového rámce, který omezuje data podle roku, počtu a pohlaví. K omezení sloupců používá metodu Apache Spark select() . Používá také Apache Spark orderBy() a desc() funkce k seřazení nového datového rámce podle počtu.

    Python

    subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
    display(subsetDF)
    

    Scala

    val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
    
    display(subsetDF)
    

    R

    subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count")
    display(subsetDF)
    
  2. Stisknutím klávesy Shift+Enter spusťte buňku a přejděte na další buňku.

Krok 5: Uložení datového rámce

Naučte se ukládat datový rámec. Datový rámec můžete uložit do tabulky nebo zapsat datový rámec do souboru nebo do více souborů.

Uložení datového rámce do tabulky

Azure Databricks ve výchozím nastavení používá formát Delta Lake pro všechny tabulky. Pokud chcete datový rámec uložit, musíte mít CREATE oprávnění tabulky k katalogu a schématu.

  1. Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód uloží obsah datového rámce do tabulky pomocí proměnné, kterou jste definovali na začátku tohoto kurzu.

    Python

    df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")
    

    Scala

    df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")
    

    R

    saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")
    
  2. Stisknutím klávesy Shift+Enter spusťte buňku a přejděte na další buňku.

Většina aplikací Apache Spark pracuje na velkých datových sadách a distribuovaným způsobem. Apache Spark zapisuje adresář souborů místo jednoho souboru. Delta Lake rozdělí složky a soubory Parquet. Mnoho datových systémů může tyto adresáře souborů číst. Azure Databricks doporučuje používat tabulky přes cesty k souborům pro většinu aplikací.

Uložení datového rámce do souborů JSON

  1. Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód uloží datový rámec do adresáře souborů JSON.

    Python

    df.write.format("json").mode("overwrite").save("/tmp/json_data")
    

    Scala

    df.write.format("json").mode("overwrite").save("/tmp/json_data")
    

    R

    write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
    
  2. Stisknutím klávesy Shift+Enter spusťte buňku a přejděte na další buňku.

Čtení datového rámce ze souboru JSON

Naučte se používat metodu Apache Spark spark.read.format() ke čtení dat JSON z adresáře do datového rámce.

  1. Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód zobrazí soubory JSON, které jste uložili v předchozím příkladu.

    Python

    display(spark.read.format("json").json("/tmp/json_data"))
    

    Scala

    display(spark.read.format("json").json("/tmp/json_data"))
    

    R

    display(read.json("/tmp/json_data"))
    
  2. Stisknutím klávesy Shift+Enter spusťte buňku a přejděte na další buňku.

Další úlohy: Spouštění dotazů SQL v PySpark, Scala a R

Datové rámce Apache Spark nabízejí následující možnosti pro kombinování SQL s PySpark, Scala a R. Následující kód můžete spustit ve stejném poznámkovém bloku, který jste vytvořili pro účely tohoto kurzu.

Zadání sloupce jako dotazu SQL

Naučte se používat metodu Apache Spark selectExpr() . Jedná se o variantu select() metody, která přijímá výrazy SQL a vrací aktualizovaný datový rámec. Tato metoda umožňuje použít výraz SQL, například upper.

  1. Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód používá metodu Apache Spark selectExpr() a výraz SQL upper k převodu sloupce řetězce na velká písmena (a přejmenování sloupce).

    Python

    display(df.selectExpr("Count", "upper(County) as big_name"))
    

    Scala

    display(df.selectExpr("Count", "upper(County) as big_name"))
    

    R

    display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
    
  2. Stisknutím klávesy Shift+Enter spusťte buňku a přejděte na další buňku.

Použití expr() syntaxe SQL pro sloupec

Naučte se importovat a používat funkci Apache Spark expr() k použití syntaxe SQL kdekoli, kde by byl zadaný sloupec.

  1. Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód naimportuje expr() funkci a pak použije funkci Apache Spark expr() a výraz SQL lower k převodu sloupce řetězce na malá písmena (a přejmenování sloupce).

    Python

    from pyspark.sql.functions import expr
    display(df.select("Count", expr("lower(County) as little_name")))
    

    Scala

    import org.apache.spark.sql.functions.{col, expr}
    // Scala requires us to import the col() function as well as the expr() function
    
    display(df.select(col("Count"), expr("lower(County) as little_name")))
    

    R

    display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name"))
    # expr() function is not supported in R, selectExpr in SparkR replicates this functionality
    
  2. Stisknutím klávesy Shift+Enter spusťte buňku a přejděte na další buňku.

Spuštění libovolného dotazu SQL pomocí funkce spark.sql()

Naučte se používat funkci Apache Spark spark.sql() ke spouštění libovolných dotazů SQL.

  1. Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód používá funkci Apache Spark spark.sql() k dotazování tabulky SQL pomocí syntaxe SQL.

    Python

    display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))
    

    Scala

    display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))
    

    R

    display(sql(paste("SELECT * FROM", path_table, ".", table_name)))
    
  2. Stisknutím klávesy Shift+Enter spusťte buňku a přejděte na další buňku.

Poznámkové bloky k datovým rámcům

Následující poznámkové bloky obsahují příklady dotazů z tohoto kurzu.

Python

Kurz datových rámců s využitím Pythonu

Získat poznámkový blok

Scala

Kurz datových rámců s využitím scaly

Získat poznámkový blok

R

Kurz datových rámců s využitím jazyka R

Získat poznámkový blok

Další materiály