Megosztás a következőn keresztül:


Adatkeretek és táblák használata az R-ben

Fontos

A Databricks SparkR elavult a Databricks Runtime 16.0-s és újabb verziókban. A Databricks inkább sparklyr használatát javasolja.

Ez a cikk bemutatja, hogyan használhat R-csomagokat, például SparkR, sparklyrés dplyr az R data.frames, Spark DataFramesés memóriabeli táblák használatához.

Vegye figyelembe, hogy a SparkR, a Sparklyr és a dplyr használatakor előfordulhat, hogy az összes csomaggal végrehajthat egy adott műveletet, és használhatja a legkényelmesebb csomagot. Lekérdezés futtatásához meghívhat például SparkR::sql, sparklyr::sdf_sqlés dplyr::selectfüggvényeket. Máskor előfordulhat, hogy egy műveletet csak egy vagy két csomaggal hajthat végre, és a választott művelet a használati forgatókönyvtől függ. A sparklyr::sdf_quantile hívási módja például kissé eltér a dplyr::percentile_approxhívási módjától, annak ellenére, hogy mindkét függvény kiszámít kvantiliseket.

Az SQL-t használhatja hídként a SparkR és a Sparklyr között. A SparkR::sql használatával például lekérdezheti a sparklyrrel létrehozott táblákat. Az sparklyr::sdf_sql használatával lekérdezheti a SparkR-lel létrehozott táblákat. És dplyr kód mindig le lesz fordítva az SQL-be a memóriában a futtatás előtt. Lásd még API együttműködés és SQL-fordítás.

SparkR, sparklyr és dplyr betöltése

A SparkR-, sparklyr- és dplyr-csomagok az Azure Databricks fürtökretelepített Databricks-futtatókörnyezet részét képezik. Ezért nem kell hívnia a szokásos install.package-t, mielőtt elkezdené hívni ezeket a csomagokat. Ezeket a csomagokat azonban először a library-val kell betöltenie. Egy R -jegyzetfüzet egy Azure Databricks-munkaterületen például futtassa a következő kódot egy jegyzetfüzetcellában a SparkR, a sparklyr és a dplyr betöltéséhez:

library(SparkR)
library(sparklyr)
library(dplyr)

Sparklyr csatlakoztatása klaszterhez

A sparklyr betöltése után meg kell hívnia sparklyr::spark_connect a fürthöz való csatlakozáshoz, megadva a databricks kapcsolati módszert. Futtassa például a következő kódot egy jegyzetfüzetcellában a jegyzetfüzetet üzemeltető fürthöz való csatlakozáshoz:

sc <- spark_connect(method = "databricks")

Ezzel szemben egy Azure Databricks-jegyzetfüzet már létrehoz egy SparkSession-t a fürtön a SparkR használatához, így nem kell meghívnia SparkR::sparkR.session-t, hogy elkezdhesse a SparkR-t használni.

JSON-adatfájl feltöltése a munkaterületre

A cikkben szereplő számos példakód az Azure Databricks-munkaterület egy adott helyén található adatokon alapul, adott oszlopnevekkel és adattípusokkal. A példakód adatai egy book.json nevű JSON-fájlból származnak a GitHubról. A fájl lekérése és feltöltése a munkaterületre:

  1. Lépjen a GitHub books.json fájljára, és egy szövegszerkesztővel másolja a tartalmát egy books.json nevű fájlba a helyi számítógépen.
  2. Az Azure Databricks-munkaterület oldalsávjában kattintson a Katalóguselemre.
  3. Kattintson a Tábla létrehozásaelemre.
  4. A Fájl feltöltése fülön helyezze a books.json fájlt a helyi gépéről a Fájlok feltöltése dobozba. Vagy válassza , kattintson a-re tallózáshoz, és böngéssze ki a books.json fájlt a helyi gépén.

Az Azure Databricks alapértelmezés szerint feltölti a helyi books.json fájlt a munkaterület DBFS helyére a /FileStore/tables/books.jsonelérési úttal.

Ne kattintson a Tábla létrehozása a felhasználói felülettel vagy a Tábla létrehozása a jegyzetfüzetbengombra. A cikkben szereplő példakódok a feltöltött books.json fájlban lévő adatokat használják ezen a DBFS-helyen.

JSON-adatok beolvasása DataFrame-be

A sparklyr::spark_read_json használatával beolvassa a feltöltött JSON-fájlt egy DataFrame-be, megadva a kapcsolatot, a JSON-fájl elérési útját és az adatok belső táblaképének nevét. Ebben a példában meg kell adnia, hogy a book.json fájl több sort tartalmaz-e. Az oszlopok sémájának megadása nem kötelező. Ellenkező esetben a Sparklyr alapértelmezés szerint az oszlopok sémáját követi. Futtassa például a következő kódot egy jegyzetfüzetcellában a feltöltött JSON-fájl adatainak beolvasásához egy jsonDFnevű DataFrame-be:

jsonDF <- spark_read_json(
  sc      = sc,
  name    = "jsonTable",
  path    = "/FileStore/tables/books.json",
  options = list("multiLine" = TRUE),
  columns = c(
    author    = "character",
    country   = "character",
    imageLink = "character",
    language  = "character",
    link      = "character",
    pages     = "integer",
    title     = "character",
    year      = "integer"
  )
)

A DataFrame első sorainak nyomtatásához használhatja a SparkR::head, a SparkR::showvagy a sparklyr::collect. Alapértelmezés szerint head alapértelmezés szerint az első hat sort nyomtatja ki. show és collect nyomtassa ki az első 10 sort. Futtassa például a következő kódot egy jegyzetfüzetcellában a jsonDFnevű DataFrame első sorainak nyomtatásához:

head(jsonDF)

# Source: spark<?> [?? x 8]
#   author                  country        image…¹ langu…² link  pages title  year
#   <chr>                   <chr>          <chr>   <chr>   <chr> <int> <chr> <int>
# 1 Chinua Achebe           Nigeria        images… English "htt…   209 Thin…  1958
# 2 Hans Christian Andersen Denmark        images… Danish  "htt…   784 Fair…  1836
# 3 Dante Alighieri         Italy          images… Italian "htt…   928 The …  1315
# 4 Unknown                 Sumer and Akk… images… Akkadi… "htt…   160 The … -1700
# 5 Unknown                 Achaemenid Em… images… Hebrew  "htt…   176 The …  -600
# 6 Unknown                 India/Iran/Ir… images… Arabic  "htt…   288 One …  1200
# … with abbreviated variable names ¹​imageLink, ²​language

show(jsonDF)

# Source: spark<jsonTable> [?? x 8]
#    author                  country       image…¹ langu…² link  pages title  year
#    <chr>                   <chr>         <chr>   <chr>   <chr> <int> <chr> <int>
#  1 Chinua Achebe           Nigeria       images… English "htt…   209 Thin…  1958
#  2 Hans Christian Andersen Denmark       images… Danish  "htt…   784 Fair…  1836
#  3 Dante Alighieri         Italy         images… Italian "htt…   928 The …  1315
#  4 Unknown                 Sumer and Ak… images… Akkadi… "htt…   160 The … -1700
#  5 Unknown                 Achaemenid E… images… Hebrew  "htt…   176 The …  -600
#  6 Unknown                 India/Iran/I… images… Arabic  "htt…   288 One …  1200
#  7 Unknown                 Iceland       images… Old No… "htt…   384 Njál…  1350
#  8 Jane Austen             United Kingd… images… English "htt…   226 Prid…  1813
#  9 Honoré de Balzac        France        images… French  "htt…   443 Le P…  1835
# 10 Samuel Beckett          Republic of … images… French… "htt…   256 Moll…  1952
# … with more rows, and abbreviated variable names ¹​imageLink, ²​language
# ℹ Use `print(n = ...)` to see more rows

collect(jsonDF)

# A tibble: 100 × 8
#    author                  country       image…¹ langu…² link  pages title  year
#    <chr>                   <chr>         <chr>   <chr>   <chr> <int> <chr> <int>
#  1 Chinua Achebe           Nigeria       images… English "htt…   209 Thin…  1958
#  2 Hans Christian Andersen Denmark       images… Danish  "htt…   784 Fair…  1836
#  3 Dante Alighieri         Italy         images… Italian "htt…   928 The …  1315
#  4 Unknown                 Sumer and Ak… images… Akkadi… "htt…   160 The … -1700
#  5 Unknown                 Achaemenid E… images… Hebrew  "htt…   176 The …  -600
#  6 Unknown                 India/Iran/I… images… Arabic  "htt…   288 One …  1200
#  7 Unknown                 Iceland       images… Old No… "htt…   384 Njál…  1350
#  8 Jane Austen             United Kingd… images… English "htt…   226 Prid…  1813
#  9 Honoré de Balzac        France        images… French  "htt…   443 Le P…  1835
# 10 Samuel Beckett          Republic of … images… French… "htt…   256 Moll…  1952
# … with 90 more rows, and abbreviated variable names ¹​imageLink, ²​language
# ℹ Use `print(n = ...)` to see more rows

SQL-lekérdezések futtatása, írás és olvasás táblából

A dplyr függvényekkel SQL-lekérdezéseket futtathat DataFrame-en. Futtassa a következő kódot egy jegyzetfüzetcellában például a dplyr::group_by és a dployr::count használatával a jsonDFnevű DataFrame-ből származó szerzők számának lekéréséhez. A dplyr::arrange és a dplyr::desc használatával csökkenő sorrendbe rendezheti az eredményt szám szerint. Ezután alapértelmezés szerint nyomtassa ki az első 10 sort.

group_by(jsonDF, author) %>%
  count() %>%
  arrange(desc(n))

# Source:     spark<?> [?? x 2]
# Ordered by: desc(n)
#    author                     n
#    <chr>                  <dbl>
#  1 Fyodor Dostoevsky          4
#  2 Unknown                    4
#  3 Leo Tolstoy                3
#  4 Franz Kafka                3
#  5 William Shakespeare        3
#  6 William Faulkner           2
#  7 Gustave Flaubert           2
#  8 Homer                      2
#  9 Gabriel García Márquez     2
# 10 Thomas Mann                2
# … with more rows
# ℹ Use `print(n = ...)` to see more rows

Ezt követően az sparklyr::spark_write_table használatával egy Azure Databricks-táblába írhatja az eredményt. Futtassa például a következő kódot egy jegyzetfüzetcellában a lekérdezés újrafuttatásához, majd írja az eredményt egy json_books_aggnevű táblába:

group_by(jsonDF, author) %>%
  count() %>%
  arrange(desc(n)) %>%
  spark_write_table(
    name = "json_books_agg",
    mode = "overwrite"
  )

A tábla létrehozásának ellenőrzéséhez használhatja a sparklyr::sdf_sql és a SparkR::showDF a tábla adatainak megjelenítéséhez. Futtassa például a következő kódot egy jegyzetfüzetcellában a tábla DataFrame-be való lekérdezéséhez, majd a sparklyr::collect használatával alapértelmezés szerint a DataFrame első 10 sorát nyomtathatja ki:

collect(sdf_sql(sc, "SELECT * FROM json_books_agg"))

# A tibble: 82 × 2
#    author                     n
#    <chr>                  <dbl>
#  1 Fyodor Dostoevsky          4
#  2 Unknown                    4
#  3 Leo Tolstoy                3
#  4 Franz Kafka                3
#  5 William Shakespeare        3
#  6 William Faulkner           2
#  7 Homer                      2
#  8 Gustave Flaubert           2
#  9 Gabriel García Márquez     2
# 10 Thomas Mann                2
# … with 72 more rows
# ℹ Use `print(n = ...)` to see more rows

A sparklyr::spark_read_table is használhatja a hasonló műveletekhez. Futtassa például a következő kódot egy jegyzetfüzetcellában, hogy lekérdezhesse a jsonDF nevű előző DataFrame-et egy DataFrame-be, majd a sparklyr::collect használatával alapértelmezés szerint kinyomtathatja a DataFrame első 10 sorát:

fromTable <- spark_read_table(
  sc   = sc,
  name = "json_books_agg"
)

collect(fromTable)

# A tibble: 82 × 2
#    author                     n
#    <chr>                  <dbl>
#  1 Fyodor Dostoevsky          4
#  2 Unknown                    4
#  3 Leo Tolstoy                3
#  4 Franz Kafka                3
#  5 William Shakespeare        3
#  6 William Faulkner           2
#  7 Homer                      2
#  8 Gustave Flaubert           2
#  9 Gabriel García Márquez     2
# 10 Thomas Mann                2
# … with 72 more rows
# ℹ Use `print(n = ...)` to see more rows

Oszlopok és számítási oszlopértékek hozzáadása DataFrame-ben

A dplyr függvényekkel oszlopokat adhat hozzá a DataFrame-ekhez és kiszámíthatja az oszlopok értékeit.

Futtassa például a következő kódot egy jegyzetfüzetcellában a jsonDFnevű DataFrame tartalmának lekéréséhez. A dplyr::mutate használatával vegyen fel egy todaynevű oszlopot, és töltse ki ezt az új oszlopot az aktuális időbélyeggel. Ezután írja be ezeket a tartalmakat egy withDate nevű új DataFrame-be, és a dplyr::collect használatával alapértelmezés szerint kinyomtatja az új DataFrame első 10 sorát.

Jegyzet

dplyr::mutate csak olyan argumentumokat fogad el, amelyek megfelelnek a Hive beépített függvényeinek (más néven UDF-eknek) és a beépített aggregátumfüggvényeknek (más néven UDAF-eknek). Általános információkhoz lásd: Hive Functions. A jelen szakaszban szereplő dátumfüggvényekről további információt Dátumfüggvényekcímű témakörben talál.

withDate <- jsonDF %>%
  mutate(today = current_timestamp())

collect(withDate)

# A tibble: 100 × 9
#    author    country image…¹ langu…² link  pages title  year today
#    <chr>     <chr>   <chr>   <chr>   <chr> <int> <chr> <int> <dttm>
#  1 Chinua A… Nigeria images… English "htt…   209 Thin…  1958 2022-09-27 21:32:59
#  2 Hans Chr… Denmark images… Danish  "htt…   784 Fair…  1836 2022-09-27 21:32:59
#  3 Dante Al… Italy   images… Italian "htt…   928 The …  1315 2022-09-27 21:32:59
#  4 Unknown   Sumer … images… Akkadi… "htt…   160 The … -1700 2022-09-27 21:32:59
#  5 Unknown   Achaem… images… Hebrew  "htt…   176 The …  -600 2022-09-27 21:32:59
#  6 Unknown   India/… images… Arabic  "htt…   288 One …  1200 2022-09-27 21:32:59
#  7 Unknown   Iceland images… Old No… "htt…   384 Njál…  1350 2022-09-27 21:32:59
#  8 Jane Aus… United… images… English "htt…   226 Prid…  1813 2022-09-27 21:32:59
#  9 Honoré d… France  images… French  "htt…   443 Le P…  1835 2022-09-27 21:32:59
# 10 Samuel B… Republ… images… French… "htt…   256 Moll…  1952 2022-09-27 21:32:59
# … with 90 more rows, and abbreviated variable names ¹​imageLink, ²​language
# ℹ Use `print(n = ...)` to see more rows

Most a dplyr::mutate használatával adjon hozzá még két oszlopot a withDate DataFrame tartalmához. Az új month és year oszlopok tartalmazzák a today oszlopból származó numerikus hónapot és évet. Ezután írja ezeket a tartalmakat egy withMMyyyynevű új DataFrame-be, és alapértelmezés szerint a dplyr::select és a dplyr::collect használatával nyomtassa ki az új DataFrame első tíz sorának author, title, month és year oszlopait.

withMMyyyy <- withDate %>%
  mutate(month = month(today),
         year  = year(today))

collect(select(withMMyyyy, c("author", "title", "month", "year")))

# A tibble: 100 × 4
#    author                  title                                     month  year
#    <chr>                   <chr>                                     <int> <int>
#  1 Chinua Achebe           Things Fall Apart                             9  2022
#  2 Hans Christian Andersen Fairy tales                                   9  2022
#  3 Dante Alighieri         The Divine Comedy                             9  2022
#  4 Unknown                 The Epic Of Gilgamesh                         9  2022
#  5 Unknown                 The Book Of Job                               9  2022
#  6 Unknown                 One Thousand and One Nights                   9  2022
#  7 Unknown                 Njál's Saga                                   9  2022
#  8 Jane Austen             Pride and Prejudice                           9  2022
#  9 Honoré de Balzac        Le Père Goriot                                9  2022
# 10 Samuel Beckett          Molloy, Malone Dies, The Unnamable, the …     9  2022
# … with 90 more rows
# ℹ Use `print(n = ...)` to see more rows

Most a dplyr::mutate használatával adjon hozzá még két oszlopot a withMMyyyy DataFrame tartalmához. Az új formatted_date oszlopok a today oszlop yyyy-MM-dd részét, míg az új day oszlop az új formatted_date oszlop numerikus napját tartalmazza. Ezután írja be ezeket a tartalmakat egy withUnixTimestampnevű új DataFrame-be, és a dplyr::select és a dplyr::collect használatával alapértelmezés szerint az új DataFrame első tíz sorának title, formatted_dateés day oszlopait nyomtathatja ki:

withUnixTimestamp <- withMMyyyy %>%
  mutate(formatted_date = date_format(today, "yyyy-MM-dd"),
         day            = dayofmonth(formatted_date))

collect(select(withUnixTimestamp, c("title", "formatted_date", "day")))

# A tibble: 100 × 3
#    title                                           formatted_date   day
#    <chr>                                           <chr>          <int>
#  1 Things Fall Apart                               2022-09-27        27
#  2 Fairy tales                                     2022-09-27        27
#  3 The Divine Comedy                               2022-09-27        27
#  4 The Epic Of Gilgamesh                           2022-09-27        27
#  5 The Book Of Job                                 2022-09-27        27
#  6 One Thousand and One Nights                     2022-09-27        27
#  7 Njál's Saga                                     2022-09-27        27
#  8 Pride and Prejudice                             2022-09-27        27
#  9 Le Père Goriot                                  2022-09-27        27
# 10 Molloy, Malone Dies, The Unnamable, the trilogy 2022-09-27        27
# … with 90 more rows
# ℹ Use `print(n = ...)` to see more rows

Ideiglenes nézet létrehozása

Létrehozhat névvel ellátott ideiglenes nézeteket a memóriában, amelyek meglévő DataFrame-eken alapulnak. Futtassa például a következő kódot egy notebook cellában, hogy SparkR::createOrReplaceTempView használva lekérhesse az előző, jsonTable nevű DataFrame tartalmát, és készítsen belőle egy ideiglenes nézetet timestampTablenéven. Ezután sparklyr::spark_read_table használatával olvassa be az ideiglenes nézet tartalmát. Alapértelmezés szerint a sparklyr::collect használatával nyomtasd ki az ideiglenes tábla első 10 sorát.

createOrReplaceTempView(withTimestampDF, viewName = "timestampTable")

spark_read_table(
  sc = sc,
  name = "timestampTable"
) %>% collect()

# A tibble: 100 × 10
#    author    country image…¹ langu…² link  pages title  year today
#    <chr>     <chr>   <chr>   <chr>   <chr> <int> <chr> <int> <dttm>
#  1 Chinua A… Nigeria images… English "htt…   209 Thin…  1958 2022-09-27 21:11:56
#  2 Hans Chr… Denmark images… Danish  "htt…   784 Fair…  1836 2022-09-27 21:11:56
#  3 Dante Al… Italy   images… Italian "htt…   928 The …  1315 2022-09-27 21:11:56
#  4 Unknown   Sumer … images… Akkadi… "htt…   160 The … -1700 2022-09-27 21:11:56
#  5 Unknown   Achaem… images… Hebrew  "htt…   176 The …  -600 2022-09-27 21:11:56
#  6 Unknown   India/… images… Arabic  "htt…   288 One …  1200 2022-09-27 21:11:56
#  7 Unknown   Iceland images… Old No… "htt…   384 Njál…  1350 2022-09-27 21:11:56
#  8 Jane Aus… United… images… English "htt…   226 Prid…  1813 2022-09-27 21:11:56
#  9 Honoré d… France  images… French  "htt…   443 Le P…  1835 2022-09-27 21:11:56
# 10 Samuel B… Republ… images… French… "htt…   256 Moll…  1952 2022-09-27 21:11:56
# … with 90 more rows, 1 more variable: month <chr>, and abbreviated variable
#   names ¹​imageLink, ²​language
# ℹ Use `print(n = ...)` to see more rows, and `colnames()` to see all variable names

Statisztikai elemzés végrehajtása DataFrame-en

A sparklyr és a dplyr statisztikai elemzésekhez is használható.

Hozzon létre például egy DataFrame-et a statisztikák futtatásához. Ehhez futtassa a következő kódot egy jegyzetfüzetcellában, hogy sparklyr::sdf_copy_to segítségével írja az R-be beépített iris adatkészlet tartalmát egy irisnevű DataFrame-be. Alapértelmezés szerint a sparklyr::sdf_collect használatával nyomtassa ki az ideiglenes tábla első 10 sorát.

irisDF <- sdf_copy_to(
  sc        = sc,
  x         = iris,
  name      = "iris",
  overwrite = TRUE
)

sdf_collect(irisDF, "row-wise")

# A tibble: 150 × 5
#    Sepal_Length Sepal_Width Petal_Length Petal_Width Species
#           <dbl>       <dbl>        <dbl>       <dbl> <chr>
#  1          5.1         3.5          1.4         0.2 setosa
#  2          4.9         3            1.4         0.2 setosa
#  3          4.7         3.2          1.3         0.2 setosa
#  4          4.6         3.1          1.5         0.2 setosa
#  5          5           3.6          1.4         0.2 setosa
#  6          5.4         3.9          1.7         0.4 setosa
#  7          4.6         3.4          1.4         0.3 setosa
#  8          5           3.4          1.5         0.2 setosa
#  9          4.4         2.9          1.4         0.2 setosa
# 10          4.9         3.1          1.5         0.1 setosa
# … with 140 more rows
# ℹ Use `print(n = ...)` to see more rows

Most a dplyr::group_by használatával csoportosíthatja a sorokat a Species oszlop szerint. A Sepal_Length oszlop 25., 50., 75. és 100. kvantilisére dplyr::summarize és dplyr::percentile_approx használatával kiszámíthatja az összegző statisztikákat a Speciesszerint. Használja a sparklyr::collect parancsot az eredmények kinyomtatásához.

Jegyzet

dplyr::summarize csak olyan argumentumokat fogad el, amelyek megfelelnek a Hive beépített függvényeinek (más néven UDF-eknek) és a beépített aggregátumfüggvényeknek (más néven UDAF-eknek). Az általános információkért lásd: Hive Functions. További információkért a percentile_approx-ról, tekintse meg a beépített összesítő függvényeket (UDAF).

quantileDF <- irisDF %>%
  group_by(Species) %>%
  summarize(
    quantile_25th = percentile_approx(
      Sepal_Length,
      0.25
    ),
    quantile_50th = percentile_approx(
      Sepal_Length,
      0.50
    ),
    quantile_75th = percentile_approx(
      Sepal_Length,
      0.75
    ),
    quantile_100th = percentile_approx(
      Sepal_Length,
      1.0
    )
  )

collect(quantileDF)

# A tibble: 3 × 5
#   Species    quantile_25th quantile_50th quantile_75th quantile_100th
#   <chr>              <dbl>         <dbl>         <dbl>          <dbl>
# 1 virginica            6.2           6.5           6.9            7.9
# 2 versicolor           5.6           5.9           6.3            7
# 3 setosa               4.8           5             5.2            5.8

Hasonló eredmények kiszámíthatók például sparklyr::sdf_quantilehasználatával:

print(sdf_quantile(
  x = irisDF %>%
    filter(Species == "virginica"),
  column = "Sepal_Length",
  probabilities = c(0.25, 0.5, 0.75, 1.0)
))

# 25%  50%  75% 100%
# 6.2  6.5  6.9  7.9