Öğretici: Apache Spark DataFrames kullanarak verileri yükleme ve dönüştürme

Bu öğreticide Apache Spark Python (PySpark) DataFrame API'sini, Apache Spark Scala DataFrame API'sini ve Azure Databricks'teki SparkR SparkDataFrame API'sini kullanarak verileri yükleme ve dönüştürme adımları gösterilmektedir.

Bu öğreticinin sonunda DataFrame'in ne olduğunu anlayacak ve aşağıdaki görevlere aşina olacaksınız:

Python

Ayrıca bkz. Apache Spark PySpark API başvurusu.

Scala

Ayrıca bkz. Apache Spark Scala API başvurusu.

R

Ayrıca bkz. Apache SparkR API başvurusu.

DataFrame nedir?

DataFrame, potansiyel olarak farklı türlerde sütunlar içeren iki boyutlu etiketli bir veri yapısıdır. DataFrame'i elektronik tablo, SQL tablosu veya seri nesneleri sözlüğü gibi düşünebilirsiniz. Apache Spark DataFrames, yaygın veri çözümleme sorunlarını verimli bir şekilde çözmenize olanak sağlayan zengin bir işlev kümesi (sütunları seçme, filtreleme, birleştirme, toplama) sağlar.

Apache Spark DataFrames, Dayanıklı Dağıtılmış Veri Kümelerinin (RDD) üzerine kurulmuş bir soyutlamadır. Spark DataFrames ve Spark SQL birleşik bir planlama ve iyileştirme altyapısı kullanarak Azure Databricks'te desteklenen tüm dillerde (Python, SQL, Scala ve R) neredeyse aynı performansı elde edebilirsiniz.

Gereksinimler

Aşağıdaki öğreticiyi tamamlamak için aşağıdaki gereksinimleri karşılamanız gerekir:

  • Bu öğreticideki örnekleri kullanmak için çalışma alanınızda Unity Kataloğu'nu etkinleştirmeniz gerekir.

  • Bu öğreticideki örneklerde örnek verileri depolamak için Unity Kataloğu birimi kullanılır. Bu örnekleri kullanmak için bir birim oluşturun ve bu birimin kataloğunu, şemasını ve birim adlarını kullanarak örnekler tarafından kullanılan birim yolunu ayarlayın.

  • Unity Kataloğu'nda aşağıdaki izinlere sahip olmanız gerekir:

    • READ VOLUME ve WRITE VOLUMEveya ALL PRIVILEGES bu öğretici için kullanılan birim için.
    • USE SCHEMA veya ALL PRIVILEGES bu öğretici için kullanılan şema için.
    • USE CATALOG veya ALL PRIVILEGES bu öğretici için kullanılan katalog için.

    Bu izinleri ayarlamak için Databricks yöneticinize veya Unity Kataloğu ayrıcalıklarına ve güvenli hale getirilebilir nesnelere bakın.

1. Adım: Değişkenleri tanımlama ve CSV dosyasını yükleme

Bu adım, bu öğreticide kullanılacak değişkenleri tanımlar ve ardından health.data.ny.gov Unity Kataloğu biriminize bebek adı verilerini içeren bir CSV dosyası yükler.

  1. Simgeye tıklayarak Yeni Simge yeni bir not defteri açın. Azure Databricks not defterlerinde gezinmeyi öğrenmek için bkz . Databricks not defteri arabirimi ve denetimleri.

  2. Aşağıdaki kodu kopyalayıp yeni boş not defteri hücresine yapıştırın. , <schema-name>ve <volume-name> yerine Unity Kataloğu biriminin katalog, şema ve birim adlarını yazın<catalog-name>. değerini, seçtiğiniz bir tablo adıyla değiştirin <table_name> . Bu öğreticinin ilerleyen bölümlerinde bu tabloya bebek adı verilerini yükleyebilirsiniz.

  3. Hücreyi çalıştırmak ve yeni bir boş hücre oluşturmak için basın Shift+Enter .

    Python

    catalog = "<catalog_name>"
    schema = "<schema_name>"
    volume = "<volume_name>"
    download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name = "rows.csv"
    table_name = "<table_name>"
    path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
    path_tables = catalog + "." + schema
    print(path_tables) # Show the complete path
    print(path_volume) # Show the complete path
    

    Scala

    val catalog = "<catalog_name>"
    val schema = "<schema_name>"
    val volume = "<volume_name>"
    val download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    val file_name = "rows.csv"
    val table_name = "<table_name>"
    val path_volume = s"/Volumes/$catalog/$schema/$volume"
    val path_tables = s"$catalog.$schema.$table_name"
    print(path_volume) // Show the complete path
    print(path_tables) // Show the complete path
    

    R

    catalog <- "<catalog_name>"
    schema <- "<schema_name>"
    volume <- "<volume_name>"
    download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name <- "rows.csv"
    table_name <- "<table_name>"
    path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "")
    path_tables <- paste(catalog, ".", schema, sep = "")
    print(path_volume) # Show the complete path
    print(path_tables) # Show the complete path
    
  4. Aşağıdaki kodu kopyalayıp yeni boş not defteri hücresine yapıştırın. Bu kod, Databricks rows.csv dbutuils komutunu kullanarak dosyayı health.data.ny.gov Unity Kataloğu biriminize kopyalar.

  5. Hücreyi çalıştırmak için basın Shift+Enter ve ardından sonraki hücreye geçin.

    Python

    dbutils.fs.cp(f"{download_url}", f"{path_volume}" + "/" + f"{file_name}")
    

    Scala

    dbutils.fs.cp(download_url, s"$path_volume/$file_name")
    

    R

    dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
    

2. Adım: DataFrame oluşturma

Bu adım, test verileriyle adlı df1 bir DataFrame oluşturur ve içeriğini görüntüler.

  1. Aşağıdaki kodu kopyalayıp yeni boş not defteri hücresine yapıştırın. Bu kod, test verileriyle Dataframe'i oluşturur ve ardından DataFrame'in içeriğini ve şemasını görüntüler.

  2. Hücreyi çalıştırmak için basın Shift+Enter ve ardından sonraki hücreye geçin.

    Python

    data = [[2021, "test", "Albany", "M", 42]]
    columns = ["Year", "First_Name", "County", "Sex", "Count"]
    
    df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    Scala

    val data = Seq((2021, "test", "Albany", "M", 42))
    val columns = Seq("Year", "First_Name", "County", "Sex", "Count")
    
    val df1 = data.toDF(columns: _*)
    display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization.
    // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    R

    # Load the SparkR package that is already preinstalled on the cluster.
    library(SparkR)
    
    data <- data.frame(
      Year = c(2021),
      First_Name = c("test"),
      County = c("Albany"),
      Sex = c("M"),
      Count = c(42)
    )
    
    df1 <- createDataFrame(data)
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
    

3. Adım: CSV dosyasından DataFrame'e veri yükleme

Bu adım, Unity Kataloğu biriminize daha önce yüklediğiniz CSV dosyasından adlı df_csv bir DataFrame oluşturur. Bkz. spark.read.csv.

  1. Aşağıdaki kodu kopyalayıp yeni boş not defteri hücresine yapıştırın. Bu kod, CSV dosyasından DataFrame'e df_csv bebek adı verilerini yükler ve ardından DataFrame'in içeriğini görüntüler.

  2. Hücreyi çalıştırmak için basın Shift+Enter ve ardından sonraki hücreye geçin.

    Python

    df_csv = spark.read.csv(f"{path_volume}/{file_name}",
      header=True,
      inferSchema=True,
      sep=",")
    display(df_csv)
    

    Scala

    val df_csv = spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .option("delimiter", ",")
      .csv(s"$path_volume/$file_name")
    
    display(df_csv)
    

    R

    df_csv <- read.df(paste(path_volume, "/", file_name, sep=""),
      source="csv",
      header = TRUE,
      inferSchema = TRUE,
      delimiter = ",")
    
    display(df_csv)
    

Desteklenen birçok dosya biçiminden veri yükleyebilirsiniz.

4. Adım: DataFrame'inizi görüntüleme ve bunlarla etkileşim kurma

Aşağıdaki yöntemleri kullanarak bebek adlarınızı DataFrames olarak görüntüleyin ve bunlarla etkileşime geçin.

Apache Spark DataFrame şemasını görüntülemeyi öğrenin. Apache Spark, DataFrame'deki sütunların adlarına ve veri türlerine başvurmak için şema terimini kullanır.

Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, iki DataFrame'in şemalarını görüntüleme yöntemiyle DataFrame'lerinizin .printSchema() şemasını gösterir. İki DataFrame'in birleşimine hazırlanmak için.

Python

df_csv.printSchema()
df1.printSchema()

Scala

df_csv.printSchema()
df1.printSchema()

R

printSchema(df_csv)
printSchema(df1)

Not

Azure Databricks, kataloğa kayıtlı tablo koleksiyonunu açıklamak için şema terimini de kullanır.

DataFrame'de sütunu yeniden adlandırma

DataFrame'de bir sütunu yeniden adlandırmayı öğrenin.

Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, DataFrame'deki bir sütunu DataFrame'deki df1_csvdf1 ilgili sütunla eşleşecek şekilde yeniden adlandırır. Bu kod Apache Spark withColumnRenamed() yöntemini kullanır.

Python

df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
df_csv.printSchema

Scala

val df_csvRenamed = df_csv.withColumnRenamed("First Name", "First_Name")
// when modifying a DataFrame in Scala, you must assign it to a new variable
df_csv_renamed.printSchema()

R

df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name")
printSchema(df_csv)

DataFrame'leri birleştirme

Bir DataFrame'in satırlarını başka bir DataFrame'e ekleyen yeni bir DataFrame oluşturmayı öğrenin.

Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, ilk DataFrame'inizin df içeriğini CSV dosyasından yüklenen bebek adları verilerini içeren DataFrame df_csv ile birleştirmek için Apache Spark union() yöntemini kullanır.

Python

df = df1.union(df_csv)
display(df)

Scala

val df = df1.union(df_csv_renamed)
display(df)

R

display(df <- union(df1, df_csv))

DataFrame'de satırları filtreleme

Apache Spark'ı .filter() veya .where() yöntemleri kullanarak satırları filtreleyerek veri kümenizdeki en popüler bebek adlarını keşfedin. DataFrame'de döndürülecek veya değiştirilebilen satırların bir alt kümesini seçmek için filtrelemeyi kullanın. Aşağıdaki örneklerde görüldüğü gibi performans veya söz diziminde fark yoktur.

.filter() yöntemini kullanma

Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, 50'den fazla sayıyla Bu satırları DataFrame'de görüntülemek için Apache Spark .filter() yöntemini kullanır.

Python
display(df.filter(df["Count"] > 50))
Scala
display(df.filter(df("Count") > 50))
R
display(filteredDF <- filter(df, df$Count > 50))

.where() yöntemini kullanma

Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, 50'den fazla sayıyla Bu satırları DataFrame'de görüntülemek için Apache Spark .where() yöntemini kullanır.

Python
display(df.where(df["Count"] > 50))
Scala
display(df.where(df("Count") > 50))
R
display(filtered_df <- where(df, df$Count > 50))

DataFrame'den sütunları seçme ve sıklık ölçütüne göre sıralama

Döndürülecek DataFrame sütunlarını belirtmek için yöntemiyle select() hangi bebek adı sıklığı hakkında bilgi edinin. Sonuçları sıralamak için Apache Spark orderby ve desc işlevlerini kullanın.

Apache Spark için pyspark.sql modülü SQL işlevleri için destek sağlar. Bu öğreticide kullandığımız bu işlevler arasında Apache Spark orderBy(), desc()ve expr() işlevleri yer alır. Gerektiğinde bunları oturumunuza aktararak bu işlevlerin kullanımını etkinleştirirsiniz.

Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod işlevi içeri aktarır desc() ve en yaygın adları ve bunların sayılarını azalan sırada görüntülemek için Apache select() Spark yöntemini ve Apache Spark orderBy()desc() ve işlevlerini kullanır.

Python

from pyspark.sql.functions import desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))

Scala

import org.apache.spark.sql.functions.desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))

R

display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))

DataFrame alt kümesi oluşturma

Mevcut bir DataFrame'den bir dataframe alt kümesi oluşturmayı öğrenin.

Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, verileri yıla, sayıya ve cinsiyete göre kısıtlayan yeni bir DataFrame oluşturmak için Apache Spark filter yöntemini kullanır. Sütunları sınırlamak için Apache Spark select() yöntemini kullanır. Ayrıca Apache Spark orderBy() ve desc() işlevlerini kullanarak yeni DataFrame'i sayıya göre sıralar.

Python

subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
display(subsetDF)

Scala

val subsetDF = df.filter((df("Year") == 2009) && (df("Count") > 100) && (df("Sex") == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))

display(subsetDF)

R

subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count")
display(subsetDF)

5. Adım: DataFrame'i kaydetme

DataFrame'i kaydetmeyi öğrenin. DataFrame'inizi bir tabloya kaydedebilir veya DataFrame'i bir dosyaya veya birden çok dosyaya yazabilirsiniz.

DataFrame'i tabloya kaydetme

Azure Databricks varsayılan olarak tüm tablolar için Delta Lake biçimini kullanır. DataFrame'inizi kaydetmek için katalog ve şemada tablo ayrıcalıklarınız olmalıdır CREATE .

Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, bu öğreticinin başında tanımladığınız değişkeni kullanarak DataFrame'in içeriğini bir tabloya kaydeder.

Python

df.write.saveAsTable(s"$path_tables" + "." + s"$table_name")

# To overwrite an existing table, use the following code:
# df.write.mode("overwrite").saveAsTable(fs"$path_tables" + "." + s"$table_name")

Scala

df.write.saveAsTable(s"$path_tables" + "." + s"$table_name")

// To overwrite an existing table, use the following code:
// df.write.mode("overwrite").saveAsTable(s"$path_volume" + "." + s"$table_name")

R

saveAsTable(df, paste(path_tables, ".", table_name))
# To overwrite an existing table, use the following code:
# saveAsTable(df, paste(path_tables, ".", table_name), mode = "overwrite")

Apache Spark uygulamalarının çoğu büyük veri kümelerinde ve dağıtılmış bir şekilde çalışır. Apache Spark, tek bir dosya yerine dosyaların dizinini yazar. Delta Lake, Parquet klasörlerini ve dosyalarını böler. Birçok veri sistemi bu dosya dizinlerini okuyabilir. Azure Databricks, çoğu uygulama için dosya yolları üzerinden tabloların kullanılmasını önerir.

DataFrame'i JSON dosyalarına kaydetme

Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, DataFrame'i JSON dosyalarının dizinine kaydeder.

Python

df.write.format("json").save("/tmp/json_data")

# To overwrite an existing file, use the following code:
# df.write.format("json").mode("overwrite").save("/tmp/json_data")

Scala

df.write.format("json").save("/tmp/json_data")

// To overwrite an existing file, use the following code:
// df.write.format("json").mode("overwrite").save("/tmp/json_data")

R

write.df(df, path = "/tmp/json_data", source = "json")
# To overwrite an existing file, use the following code:
# write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")

JSON dosyasından DataFrame'i okuma

Apache Spark spark.read.format() yöntemini kullanarak bir dizinden DataFrame'e JSON verilerini okumayı öğrenin.

Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, önceki örnekte kaydettiğiniz JSON dosyalarını görüntüler.

Python

display(spark.read.format("json").json("/tmp/json_data"))

Scala

display(spark.read.format("json").json("/tmp/json_data"))

R

display(read.json("/tmp/json_data"))

Ek görevler: PySpark, Scala ve R'de SQL sorguları çalıştırma

Apache Spark DataFrames, SQL'i PySpark, Scala ve R ile birleştirmek için aşağıdaki seçenekleri sağlar. Aşağıdaki kodu bu öğretici için oluşturduğunuz not defterinde çalıştırabilirsiniz.

Sütunu SQL sorgusu olarak belirtme

Apache Spark selectExpr() yöntemini kullanmayı öğrenin. Bu, SQL ifadelerini kabul eden ve güncelleştirilmiş bir DataFrame döndüren yöntemin bir değişkenidir select() . Bu yöntem, gibi upperbir SQL ifadesi kullanmanıza olanak tanır.

Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, bir dize sütununu büyük harfe dönüştürmek (ve sütunu yeniden adlandırmak) için Apache Spark selectExpr() yöntemini ve SQL upper ifadesini kullanır.

Python

display(df.selectExpr("Count", "upper(County) as big_name"))

Scala

display(df.selectExpr("Count", "upper(County) as big_name"))

R

display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))

Sütun için SQL söz dizimi kullanmak için kullanın expr()

Bir sütunun belirtileceği her yerde SQL söz dizimini kullanmak için Apache Spark expr() işlevini içeri aktarmayı ve kullanmayı öğrenin.

Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod işlevi içeri aktarır expr() ve ardından Apache Spark expr() işlevini ve SQL lower ifadesini kullanarak bir dize sütununu küçük harfe dönüştürür (ve sütunu yeniden adlandırır).

Python

from pyspark.sql.functions import expr
display(df.select("Count", expr("lower(County) as little_name")))

Scala

import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function

display(df.select(col("Count"), expr("lower(County) as little_name")))

R

display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name"))
# expr() function is not supported in R, selectExpr in SparkR replicates this functionality

spark.sql() işlevini kullanarak rastgele bir SQL sorgusu çalıştırma

Rastgele SQL sorguları çalıştırmak için Apache Spark spark.sql() işlevini kullanmayı öğrenin.

Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, SQL söz dizimini kullanarak bir SQL tablosunu sorgulamak için Apache Spark spark.sql() işlevini kullanır.

Python

display(spark.sql(f"SELECT * FROM {path_tables}" + "." + f"{table_name}"))

Scala

display(spark.sql(s"SELECT * FROM $path_tables.$table_name"))

R

display(sql(paste("SELECT * FROM", path_tables, ".", table_name)))

DataFrame öğretici not defteri

Aşağıdaki not defteri, bu öğreticideki örnek sorguları içerir.

Python

DataFrames öğretici not defteri

Not defterini alma

Scala

DataFrames öğretici not defteri

Not defterini alma

R

DataFrames öğretici not defteri

Not defterini alma

Ek kaynaklar