Aracılığıyla paylaş


Öğretici: Apache Spark DataFrames kullanarak verileri yükleme ve dönüştürme

Bu öğreticide Apache Spark Python (PySpark) DataFrame API'sini, Apache Spark Scala DataFrame API'sini ve Azure Databricks'teki SparkR SparkDataFrame API'sini kullanarak verileri yükleme ve dönüştürme adımları gösterilmektedir.

Bu öğreticinin sonunda DataFrame'in ne olduğunu anlayacak ve aşağıdaki görevlere aşina olacaksınız:

Python

Ayrıca bkz. Apache Spark PySpark API başvurusu.

Scala

Ayrıca bkz. Apache Spark Scala API başvurusu.

R

Ayrıca bkz. Apache SparkR API başvurusu.

DataFrame nedir?

DataFrame, potansiyel olarak farklı türlerde sütunlar içeren iki boyutlu etiketli bir veri yapısıdır. DataFrame'i elektronik tablo, SQL tablosu veya seri nesneleri sözlüğü gibi düşünebilirsiniz. Apache Spark DataFrames, yaygın veri çözümleme sorunlarını verimli bir şekilde çözmenize olanak sağlayan zengin bir işlev kümesi (sütunları seçme, filtreleme, birleştirme, toplama) sağlar.

Apache Spark DataFrames, Dayanıklı Dağıtılmış Veri Kümelerinin (RDD) üzerine kurulmuş bir soyutlamadır. Spark DataFrames ve Spark SQL birleşik bir planlama ve iyileştirme altyapısı kullanarak Azure Databricks'te desteklenen tüm dillerde (Python, SQL, Scala ve R) neredeyse aynı performansı elde edebilirsiniz.

Gereksinimler

Aşağıdaki öğreticiyi tamamlamak için aşağıdaki gereksinimleri karşılamanız gerekir:

  • Bu öğreticideki örnekleri kullanmak için çalışma alanınızda Unity Kataloğu'nu etkinleştirmeniz gerekir.

  • Bu öğreticideki örneklerde örnek verileri depolamak için Unity Kataloğu birimi kullanılır. Bu örnekleri kullanmak için bir birim oluşturun ve bu birimin kataloğunu, şemasını ve birim adlarını kullanarak örnekler tarafından kullanılan birim yolunu ayarlayın.

  • Unity Kataloğu'nda aşağıdaki izinlere sahip olmanız gerekir:

    • READ VOLUME ve WRITE VOLUMEveya ALL PRIVILEGES bu öğretici için kullanılan birim için.
    • USE SCHEMA veya ALL PRIVILEGES bu öğretici için kullanılan şema için.
    • USE CATALOG veya ALL PRIVILEGES bu öğretici için kullanılan katalog için.

    Bu izinleri ayarlamak için Databricks yöneticinize veya Unity Kataloğu ayrıcalıklarına ve güvenli hale getirilebilir nesnelere bakın.

İpucu

Bu makalenin tamamlanmış not defteri için bkz . DataFrame öğretici not defterleri.

1. Adım: Değişkenleri tanımlama ve CSV dosyasını yükleme

Bu adım, bu öğreticide kullanılacak değişkenleri tanımlar ve ardından health.data.ny.gov Unity Kataloğu biriminize bebek adı verilerini içeren bir CSV dosyası yükler.

  1. Simgeye tıklayarak Yeni Simge yeni bir not defteri açın. Azure Databricks not defterlerinde gezinmeyi öğrenmek için bkz . Databricks not defteri arabirimi ve denetimleri.

  2. Aşağıdaki kodu kopyalayıp yeni boş not defteri hücresine yapıştırın. , <schema-name>ve <volume-name> yerine Unity Kataloğu biriminin katalog, şema ve birim adlarını yazın<catalog-name>. değerini, seçtiğiniz bir tablo adıyla değiştirin <table_name> . Bu öğreticinin ilerleyen bölümlerinde bu tabloya bebek adı verilerini yükleyebilirsiniz.

    Python

    catalog = "<catalog_name>"
    schema = "<schema_name>"
    volume = "<volume_name>"
    download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name = "rows.csv"
    table_name = "<table_name>"
    path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
    path_table = catalog + "." + schema
    print(path_table) # Show the complete path
    print(path_volume) # Show the complete path
    

    Scala

    val catalog = "<catalog_name>"
    val schema = "<schema_name>"
    val volume = "<volume_name>"
    val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    val fileName = "rows.csv"
    val tableName = "<table_name>"
    val pathVolume = s"/Volumes/$catalog/$schema/$volume"
    val pathTable = s"$catalog.$schema"
    print(pathVolume) // Show the complete path
    print(pathTable) // Show the complete path
    

    R

    catalog <- "<catalog_name>"
    schema <- "<schema_name>"
    volume <- "<volume_name>"
    download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name <- "rows.csv"
    table_name <- "<table_name>"
    path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "")
    path_table <- paste(catalog, ".", schema, sep = "")
    print(path_volume) # Show the complete path
    print(path_table) # Show the complete path
    
  3. Hücreyi çalıştırmak ve yeni bir boş hücre oluşturmak için basın Shift+Enter .

  4. Aşağıdaki kodu kopyalayıp yeni boş not defteri hücresine yapıştırın. Bu kod, Databricks rows.csv dbutuils komutunu kullanarak dosyayı health.data.ny.gov Unity Kataloğu biriminize kopyalar.

    Python

    dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")
    

    Scala

    dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")
    

    R

    dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
    
  5. Hücreyi çalıştırmak için basın Shift+Enter ve ardından sonraki hücreye geçin.

2. Adım: DataFrame oluşturma

Bu adım, test verileriyle adlı df1 bir DataFrame oluşturur ve içeriğini görüntüler.

  1. Aşağıdaki kodu kopyalayıp yeni boş not defteri hücresine yapıştırın. Bu kod, test verileriyle DataFrame'i oluşturur ve ardından DataFrame'in içeriğini ve şemasını görüntüler.

    Python

    data = [[2021, "test", "Albany", "M", 42]]
    columns = ["Year", "First_Name", "County", "Sex", "Count"]
    
    df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    Scala

    val data = Seq((2021, "test", "Albany", "M", 42))
    val columns = Seq("Year", "First_Name", "County", "Sex", "Count")
    
    val df1 = data.toDF(columns: _*)
    display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization.
    // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    R

    # Load the SparkR package that is already preinstalled on the cluster.
    library(SparkR)
    
    data <- data.frame(
      Year = as.integer(c(2021)),
      First_Name = c("test"),
      County = c("Albany"),
      Sex = c("M"),
      Count = as.integer(c(42))
    )
    
    df1 <- createDataFrame(data)
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
    
  2. Hücreyi çalıştırmak için basın Shift+Enter ve ardından sonraki hücreye geçin.

3. Adım: CSV dosyasından DataFrame'e veri yükleme

Bu adım, Unity Kataloğu biriminize daha önce yüklediğiniz CSV dosyasından adlı df_csv bir DataFrame oluşturur. Bkz. spark.read.csv.

  1. Aşağıdaki kodu kopyalayıp yeni boş not defteri hücresine yapıştırın. Bu kod, CSV dosyasından DataFrame'e df_csv bebek adı verilerini yükler ve ardından DataFrame'in içeriğini görüntüler.

    Python

    df_csv = spark.read.csv(f"{path_volume}/{file_name}",
        header=True,
        inferSchema=True,
        sep=",")
    display(df_csv)
    

    Scala

    val dfCsv = spark.read
        .option("header", "true")
        .option("inferSchema", "true")
        .option("delimiter", ",")
        .csv(s"$pathVolume/$fileName")
    
    display(dfCsv)
    

    R

    df_csv <- read.df(paste(path_volume, "/", file_name, sep=""),
        source="csv",
        header = TRUE,
        inferSchema = TRUE,
        delimiter = ",")
    
    display(df_csv)
    
  2. Hücreyi çalıştırmak için basın Shift+Enter ve ardından sonraki hücreye geçin.

Desteklenen birçok dosya biçiminden veri yükleyebilirsiniz.

4. Adım: DataFrame'inizi görüntüleme ve bunlarla etkileşim kurma

Aşağıdaki yöntemleri kullanarak bebek adlarınızı DataFrames olarak görüntüleyin ve bunlarla etkileşime geçin.

Apache Spark DataFrame şemasını görüntülemeyi öğrenin. Apache Spark, DataFrame'deki sütunların adlarına ve veri türlerine başvurmak için şema terimini kullanır.

Not

Azure Databricks, kataloğa kayıtlı tablo koleksiyonunu açıklamak için şema terimini de kullanır.

  1. Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, iki DataFrame'in şemalarını görüntüleme yöntemiyle DataFrame'lerinizin .printSchema() şemasını gösterir. İki DataFrame'in birleşimine hazırlanmak için.

    Python

    df_csv.printSchema()
    df1.printSchema()
    

    Scala

    dfCsv.printSchema()
    df1.printSchema()
    

    R

    printSchema(df_csv)
    printSchema(df1)
    
  2. Hücreyi çalıştırmak için basın Shift+Enter ve ardından sonraki hücreye geçin.

DataFrame'de sütunu yeniden adlandırma

DataFrame'de bir sütunu yeniden adlandırmayı öğrenin.

  1. Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, DataFrame'deki bir sütunu DataFrame'deki df1_csv df1 ilgili sütunla eşleşecek şekilde yeniden adlandırır. Bu kod Apache Spark withColumnRenamed() yöntemini kullanır.

    Python

    df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
    df_csv.printSchema
    

    Scala

    val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name")
    // when modifying a DataFrame in Scala, you must assign it to a new variable
    dfCsvRenamed.printSchema()
    

    R

    df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name")
    printSchema(df_csv)
    
  2. Hücreyi çalıştırmak için basın Shift+Enter ve ardından sonraki hücreye geçin.

DataFrame'leri birleştirme

Bir DataFrame'in satırlarını başka bir DataFrame'e ekleyen yeni bir DataFrame oluşturmayı öğrenin.

  1. Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, ilk DataFrame'inizin df içeriğini CSV dosyasından yüklenen bebek adları verilerini içeren DataFrame df_csv ile birleştirmek için Apache Spark union() yöntemini kullanır.

    Python

    df = df1.union(df_csv)
    display(df)
    

    Scala

    val df = df1.union(dfCsvRenamed)
    display(df)
    

    R

    display(df <- union(df1, df_csv))
    
  2. Hücreyi çalıştırmak için basın Shift+Enter ve ardından sonraki hücreye geçin.

DataFrame'de satırları filtreleme

Apache Spark'ı .filter() veya .where() yöntemleri kullanarak satırları filtreleyerek veri kümenizdeki en popüler bebek adlarını keşfedin. DataFrame'de döndürülecek veya değiştirilebilen satırların bir alt kümesini seçmek için filtrelemeyi kullanın. Aşağıdaki örneklerde görüldüğü gibi performans veya söz diziminde fark yoktur.

.filter() yöntemini kullanma

  1. Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, 50'den fazla sayıyla Bu satırları DataFrame'de görüntülemek için Apache Spark .filter() yöntemini kullanır.

    Python
    display(df.filter(df["Count"] > 50))
    
    Scala
    display(df.filter(df("Count") > 50))
    
    R
    display(filteredDF <- filter(df, df$Count > 50))
    
  2. Hücreyi çalıştırmak için basın Shift+Enter ve ardından sonraki hücreye geçin.

.where() yöntemini kullanma

  1. Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, 50'den fazla sayıyla Bu satırları DataFrame'de görüntülemek için Apache Spark .where() yöntemini kullanır.

    Python
    display(df.where(df["Count"] > 50))
    
    Scala
    display(df.where(df("Count") > 50))
    
    R
    display(filtered_df <- where(df, df$Count > 50))
    
  2. Hücreyi çalıştırmak için basın Shift+Enter ve ardından sonraki hücreye geçin.

DataFrame'den sütunları seçme ve sıklık ölçütüne göre sıralama

Döndürülecek DataFrame sütunlarını belirtmek için yöntemiyle select() hangi bebek adı sıklığı hakkında bilgi edinin. Sonuçları sıralamak için Apache Spark orderby ve desc işlevlerini kullanın.

Apache Spark için pyspark.sql modülü SQL işlevleri için destek sağlar. Bu öğreticide kullandığımız bu işlevler arasında Apache Spark orderBy(), desc()ve expr() işlevleri yer alır. Gerektiğinde bunları oturumunuza aktararak bu işlevlerin kullanımını etkinleştirirsiniz.

  1. Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod işlevi içeri aktarır desc() ve en yaygın adları ve bunların sayılarını azalan sırada görüntülemek için Apache select() Spark yöntemini ve Apache Spark orderBy() desc() ve işlevlerini kullanır.

    Python

    from pyspark.sql.functions import desc
    display(df.select("First_Name", "Count").orderBy(desc("Count")))
    

    Scala

    import org.apache.spark.sql.functions.desc
    display(df.select("First_Name", "Count").orderBy(desc("Count")))
    

    R

    display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
    
  2. Hücreyi çalıştırmak için basın Shift+Enter ve ardından sonraki hücreye geçin.

DataFrame alt kümesi oluşturma

Mevcut bir DataFrame'den bir dataframe alt kümesi oluşturmayı öğrenin.

  1. Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, verileri yıla, sayıya ve cinsiyete göre kısıtlayan yeni bir DataFrame oluşturmak için Apache Spark filter yöntemini kullanır. Sütunları sınırlamak için Apache Spark select() yöntemini kullanır. Ayrıca Apache Spark orderBy() ve desc() işlevlerini kullanarak yeni DataFrame'i sayıya göre sıralar.

    Python

    subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
    display(subsetDF)
    

    Scala

    val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
    
    display(subsetDF)
    

    R

    subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count")
    display(subsetDF)
    
  2. Hücreyi çalıştırmak için basın Shift+Enter ve ardından sonraki hücreye geçin.

5. Adım: DataFrame'i kaydetme

DataFrame'i kaydetmeyi öğrenin. DataFrame'inizi bir tabloya kaydedebilir veya DataFrame'i bir dosyaya veya birden çok dosyaya yazabilirsiniz.

DataFrame'i tabloya kaydetme

Azure Databricks varsayılan olarak tüm tablolar için Delta Lake biçimini kullanır. DataFrame'inizi kaydetmek için katalog ve şemada tablo ayrıcalıklarınız olmalıdır CREATE .

  1. Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, bu öğreticinin başında tanımladığınız değişkeni kullanarak DataFrame'in içeriğini bir tabloya kaydeder.

    Python

    df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")
    

    Scala

    df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")
    

    R

    saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")
    
  2. Hücreyi çalıştırmak için basın Shift+Enter ve ardından sonraki hücreye geçin.

Apache Spark uygulamalarının çoğu büyük veri kümelerinde ve dağıtılmış bir şekilde çalışır. Apache Spark, tek bir dosya yerine dosyaların dizinini yazar. Delta Lake, Parquet klasörlerini ve dosyalarını böler. Birçok veri sistemi bu dosya dizinlerini okuyabilir. Azure Databricks, çoğu uygulama için dosya yolları üzerinden tabloların kullanılmasını önerir.

DataFrame'i JSON dosyalarına kaydetme

  1. Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, DataFrame'i JSON dosyalarının dizinine kaydeder.

    Python

    df.write.format("json").mode("overwrite").save("/tmp/json_data")
    

    Scala

    df.write.format("json").mode("overwrite").save("/tmp/json_data")
    

    R

    write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
    
  2. Hücreyi çalıştırmak için basın Shift+Enter ve ardından sonraki hücreye geçin.

JSON dosyasından DataFrame'i okuma

Apache Spark spark.read.format() yöntemini kullanarak bir dizinden DataFrame'e JSON verilerini okumayı öğrenin.

  1. Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, önceki örnekte kaydettiğiniz JSON dosyalarını görüntüler.

    Python

    display(spark.read.format("json").json("/tmp/json_data"))
    

    Scala

    display(spark.read.format("json").json("/tmp/json_data"))
    

    R

    display(read.json("/tmp/json_data"))
    
  2. Hücreyi çalıştırmak için basın Shift+Enter ve ardından sonraki hücreye geçin.

Ek görevler: PySpark, Scala ve R'de SQL sorguları çalıştırma

Apache Spark DataFrames, SQL'i PySpark, Scala ve R ile birleştirmek için aşağıdaki seçenekleri sağlar. Aşağıdaki kodu bu öğretici için oluşturduğunuz not defterinde çalıştırabilirsiniz.

Sütunu SQL sorgusu olarak belirtme

Apache Spark selectExpr() yöntemini kullanmayı öğrenin. Bu, SQL ifadelerini kabul eden ve güncelleştirilmiş bir DataFrame döndüren yöntemin bir değişkenidir select() . Bu yöntem, gibi upperbir SQL ifadesi kullanmanıza olanak tanır.

  1. Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, bir dize sütununu büyük harfe dönüştürmek (ve sütunu yeniden adlandırmak) için Apache Spark selectExpr() yöntemini ve SQL upper ifadesini kullanır.

    Python

    display(df.selectExpr("Count", "upper(County) as big_name"))
    

    Scala

    display(df.selectExpr("Count", "upper(County) as big_name"))
    

    R

    display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
    
  2. Hücreyi çalıştırmak için basın Shift+Enter ve ardından sonraki hücreye geçin.

Sütun için SQL söz dizimi kullanmak için kullanın expr()

Bir sütunun belirtileceği her yerde SQL söz dizimini kullanmak için Apache Spark expr() işlevini içeri aktarmayı ve kullanmayı öğrenin.

  1. Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod işlevi içeri aktarır expr() ve ardından Apache Spark expr() işlevini ve SQL lower ifadesini kullanarak bir dize sütununu küçük harfe dönüştürür (ve sütunu yeniden adlandırır).

    Python

    from pyspark.sql.functions import expr
    display(df.select("Count", expr("lower(County) as little_name")))
    

    Scala

    import org.apache.spark.sql.functions.{col, expr}
    // Scala requires us to import the col() function as well as the expr() function
    
    display(df.select(col("Count"), expr("lower(County) as little_name")))
    

    R

    display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name"))
    # expr() function is not supported in R, selectExpr in SparkR replicates this functionality
    
  2. Hücreyi çalıştırmak için basın Shift+Enter ve ardından sonraki hücreye geçin.

spark.sql() işlevini kullanarak rastgele bir SQL sorgusu çalıştırma

Rastgele SQL sorguları çalıştırmak için Apache Spark spark.sql() işlevini kullanmayı öğrenin.

  1. Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, SQL söz dizimini kullanarak bir SQL tablosunu sorgulamak için Apache Spark spark.sql() işlevini kullanır.

    Python

    display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))
    

    Scala

    display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))
    

    R

    display(sql(paste("SELECT * FROM", path_table, ".", table_name)))
    
  2. Hücreyi çalıştırmak için basın Shift+Enter ve ardından sonraki hücreye geçin.

DataFrame öğreticisi not defterleri

Aşağıdaki not defterleri bu öğreticideki örnek sorguları içerir.

Python

Python kullanarak DataFrames öğreticisi

Not defterini alma

Scala

Scala kullanarak DataFrames öğreticisi

Not defterini alma

R

R kullanarak DataFrames öğreticisi

Not defterini alma

Ek kaynaklar