Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Bu öğreticide Apache Spark Python (PySpark) DataFrame API'sini, Apache Spark Scala DataFrame API'sini ve Azure Databricks'teki SparkR SparkDataFrame API'sini kullanarak verileri yükleme ve dönüştürme adımları gösterilmektedir.
Bu öğreticinin sonunda DataFrame'in ne olduğunu anlayacak ve aşağıdaki görevlere aşina olacaksınız:
Piton
- Değişkenleri tanımlama ve ortak verileri Unity Kataloğu birimine kopyalama
- Python ile DataFrame oluşturma
- CSV dosyasından DataFrame'e veri yükleme
- DataFrame'i görüntüleme ve bunlarla etkileşim kurma
- DataFrame'i kaydetme
- PySpark'ta SQL sorguları çalıştırma
Ayrıca bkz. Apache Spark PySpark API başvurusu.
Scala programlama dili
- Değişkenleri tanımlama ve ortak verileri Unity Kataloğu birimine kopyalama
- Scala ile DataFrame oluşturma
- CSV dosyasından DataFrame'e veri yükleme
- DataFrame'i görüntüleme ve bunlarla etkileşim kurma
- DataFrame'i kaydetme
- Apache Spark'ta SQL sorguları çalıştırma
Ayrıca bkz. Apache Spark Scala API başvurusu.
R
- Değişkenleri tanımlama ve ortak verileri Unity Kataloğu birimine kopyalama
- SparkR SparkDataFrames oluşturma
- CSV dosyasından DataFrame'e veri yükleme
- DataFrame'i görüntüleme ve bunlarla etkileşim kurma
- DataFrame'i kaydetme
- SparkR'da SQL sorguları çalıştırma
Ayrıca bkz. Apache SparkR API başvurusu.
DataFrame nedir?
DataFrame, potansiyel olarak farklı türlerde sütunlar içeren iki boyutlu etiketli bir veri yapısıdır. DataFrame'i elektronik tablo, SQL tablosu veya seri nesneleri sözlüğü gibi düşünebilirsiniz. Apache Spark DataFrames, yaygın veri çözümleme sorunlarını verimli bir şekilde çözmenize olanak sağlayan zengin bir işlev kümesi (sütunları seçme, filtreleme, birleştirme, toplama) sağlar.
Apache Spark DataFrames, Dayanıklı Dağıtılmış Veri Kümelerinin (RDD) üzerine kurulmuş bir soyutlamadır. Spark DataFrames ve Spark SQL birleşik bir planlama ve iyileştirme altyapısı kullanarak Azure Databricks'te desteklenen tüm dillerde (Python, SQL, Scala ve R) neredeyse aynı performansı elde edebilirsiniz.
Gereksinimler
Aşağıdaki öğreticiyi tamamlamak için aşağıdaki gereksinimleri karşılamanız gerekir:
Bu öğreticideki örnekleri kullanmak için çalışma alanınızda Unity Kataloğu etkinleştirilmesi gerekir.
Bu öğreticideki örneklerde, örnek verileri depolamak için Unity Catalog hacmi kullanılır. Bu örnekleri kullanmak için bir birim oluşturun ve bu birimin kataloğunu, şemasını ve birim adlarını kullanarak örnekler tarafından kullanılan birim yolunu ayarlayın.
Unity Kataloğu'nda aşağıdaki izinlere sahip olmanız gerekir:
-
READ VOLUME
veWRITE VOLUME
veyaALL PRIVILEGES
bu öğretici için kullanılan birim için. - Bu öğreticide kullanılan şema için
USE SCHEMA
veyaALL PRIVILEGES
. - Bu öğreticide kullanılan katalog için
USE CATALOG
veyaALL PRIVILEGES
.
Bu izinleri ayarlamak için Databricks yöneticinize veya Unity Kataloğu ayrıcalıklarına ve güvenli hale getirilebilir nesnelere bakın.
-
İpucu
Bu makalenin tamamlanmış not defteri için bkz . DataFrame öğretici not defterleri.
1. Adım: Değişkenleri tanımlama ve CSV dosyasını yükleme
Bu adım, bu öğreticide kullanılacak değişkenleri tanımlar ve ardından health.data.ny.gov'daki bebek adı verilerini içeren bir CSV dosyasını Unity Kataloğu biriminize yükler.
Simgeye tıklayarak
yeni bir not defteri açın. Azure Databricks not defterlerinde gezinmeyi öğrenmek için bkz. Not defteri görünümünü özelleştirme.
Aşağıdaki kodu kopyalayıp yeni boş not defteri hücresine yapıştırın.
<catalog-name>
,<schema-name>
ve<volume-name>
yerine Unity Kataloğu biriminin katalog, şema ve birim adlarını yazın.<table_name>
'i seçtiğiniz bir tablo adıyla değiştirin. Bu öğreticinin ilerleyen bölümlerinde bu tabloya bebek adı verilerini yükleyebilirsiniz.Piton
catalog = "<catalog_name>" schema = "<schema_name>" volume = "<volume_name>" download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name = "rows.csv" table_name = "<table_name>" path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume path_table = catalog + "." + schema print(path_table) # Show the complete path print(path_volume) # Show the complete path
Scala programlama dili
val catalog = "<catalog_name>" val schema = "<schema_name>" val volume = "<volume_name>" val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" val fileName = "rows.csv" val tableName = "<table_name>" val pathVolume = s"/Volumes/$catalog/$schema/$volume" val pathTable = s"$catalog.$schema" print(pathVolume) // Show the complete path print(pathTable) // Show the complete path
R
catalog <- "<catalog_name>" schema <- "<schema_name>" volume <- "<volume_name>" download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name <- "rows.csv" table_name <- "<table_name>" path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "") path_table <- paste(catalog, ".", schema, sep = "") print(path_volume) # Show the complete path print(path_table) # Show the complete path
Hücreyi çalıştırmak ve yeni bir boş hücre oluşturmak için basın
Shift+Enter
.Aşağıdaki kodu kopyalayıp yeni boş not defteri hücresine yapıştırın. Bu kod,
rows.csv
komutunu kullanarak dosyasını health.data.ny.gov Unity Kataloğu biriminize kopyalar.Piton
dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")
Scala programlama dili
dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")
R
dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
2. Adım: DataFrame oluşturma
Bu adım, test verileriyle adlı df1
bir DataFrame oluşturur ve içeriğini görüntüler.
Aşağıdaki kodu kopyalayıp yeni boş not defteri hücresine yapıştırın. Bu kod, test verileriyle DataFrame'i oluşturur ve ardından DataFrame'in içeriğini ve şemasını görüntüler.
Piton
data = [[2021, "test", "Albany", "M", 42]] columns = ["Year", "First_Name", "County", "Sex", "Count"] df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int") display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
Scala programlama dili
val data = Seq((2021, "test", "Albany", "M", 42)) val columns = Seq("Year", "First_Name", "County", "Sex", "Count") val df1 = data.toDF(columns: _*) display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization. // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
R
# Load the SparkR package that is already preinstalled on the cluster. library(SparkR) data <- data.frame( Year = as.integer(c(2021)), First_Name = c("test"), County = c("Albany"), Sex = c("M"), Count = as.integer(c(42)) ) df1 <- createDataFrame(data) display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
3. Adım: CSV dosyasından DataFrame'e veri yükleme
Bu adım, Unity Kataloğu biriminize daha önce yüklediğiniz CSV dosyasından df_csv
adlı bir DataFrame oluşturur. Bkz. spark.read.csv.
Aşağıdaki kodu kopyalayıp yeni boş not defteri hücresine yapıştırın. Bu kod, CSV dosyasından DataFrame'e
df_csv
bebek adı verilerini yükler ve ardından DataFrame'in içeriğini görüntüler.Piton
df_csv = spark.read.csv(f"{path_volume}/{file_name}", header=True, inferSchema=True, sep=",") display(df_csv)
Scala programlama dili
val dfCsv = spark.read .option("header", "true") .option("inferSchema", "true") .option("delimiter", ",") .csv(s"$pathVolume/$fileName") display(dfCsv)
R
df_csv <- read.df(paste(path_volume, "/", file_name, sep=""), source="csv", header = TRUE, inferSchema = TRUE, delimiter = ",") display(df_csv)
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
Desteklenen birçok dosya biçiminden veri yükleyebilirsiniz.
4. Adım: DataFrame'inizi görüntüleme ve bunlarla etkileşim kurma
Aşağıdaki yöntemleri kullanarak bebek adlarınızı DataFrames olarak görüntüleyin ve bunlarla etkileşime geçin.
DataFrame şemasını yazdırma
Apache Spark DataFrame şemasını görüntülemeyi öğrenin. Apache Spark, DataFrame'deki sütunların adlarına ve veri türlerine başvurmak için şema
Not
Azure Databricks, kataloğa kayıtlı tablo koleksiyonunu açıklamak için şema terimini de kullanır.
Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, iki DataFrame'in şemalarını görüntülemek için
.printSchema()
yöntemiyle DataFrame'lerinizin şemasını gösterir. İki DataFrame'in birleşimine hazırlanmak için.Piton
df_csv.printSchema() df1.printSchema()
Scala programlama dili
dfCsv.printSchema() df1.printSchema()
R
printSchema(df_csv) printSchema(df1)
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
DataFrame'de sütunu yeniden adlandırma
DataFrame'de bir sütunu yeniden adlandırmayı öğrenin.
Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod,
df1_csv
DataFrame'deki bir sütunudf1
DataFrame'deki ilgili sütunla eşleşecek şekilde yeniden adlandırır. Bu kod Apache SparkwithColumnRenamed()
yöntemini kullanır.Piton
df_csv = df_csv.withColumnRenamed("First Name", "First_Name") df_csv.printSchema
Scala programlama dili
val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name") // when modifying a DataFrame in Scala, you must assign it to a new variable dfCsvRenamed.printSchema()
R
df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name") printSchema(df_csv)
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
DataFrame'leri birleştirme
Bir DataFrame'in satırlarını başka bir DataFrame'e ekleyen yeni bir DataFrame oluşturmayı öğrenin.
Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, ilk DataFrame'inizin
union()
içeriğini CSV dosyasından yüklenen bebek adları verilerini içeren DataFramedf
ile birleştirmek için Apache Sparkdf_csv
yöntemini kullanır.Piton
df = df1.union(df_csv) display(df)
Scala programlama dili
val df = df1.union(dfCsvRenamed) display(df)
R
display(df <- union(df1, df_csv))
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
DataFrame'de satırları filtreleme
Apache Spark .filter()
veya .where()
yöntemlerini kullanarak satırları filtreleyerek veri kümenizdeki en popüler bebek adlarını keşfedin. DataFrame'de döndürülecek veya değiştirilebilen satırların bir alt kümesini seçmek için filtrelemeyi kullanın. Aşağıdaki örneklerde görüldüğü gibi performans veya söz diziminde fark yoktur.
.filter() yöntemini kullanma
Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, 50'den fazla sayıyla Bu satırları DataFrame'de görüntülemek için Apache Spark
.filter()
yöntemini kullanır.Piton
display(df.filter(df["Count"] > 50))
Scala programlama dili
display(df.filter(df("Count") > 50))
R
display(filteredDF <- filter(df, df$Count > 50))
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
.where() yöntemini kullanma
Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, 50'den fazla sayıyla Bu satırları DataFrame'de görüntülemek için Apache Spark
.where()
yöntemini kullanır.Piton
display(df.where(df["Count"] > 50))
Scala programlama dili
display(df.where(df("Count") > 50))
R
display(filtered_df <- where(df, df$Count > 50))
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
DataFrame'den sütunları seçme ve sıklık ölçütüne göre sıralama
Döndürülecek DataFrame sütunlarını belirtmek için select()
yöntemiyle hangi bebek adı sıklığı hakkında bilgi edinin. Sonuçları sıralamak için Apache Spark orderby
ve desc
işlevlerini kullanın.
Apache Spark için pyspark.sql modülü SQL işlevleri için destek sağlar. Bu öğreticide kullandığımız bu işlevler arasında Apache Spark orderBy()
, desc()
ve expr()
işlevleri yer alır. Gerektiğinde bunları oturumunuza aktararak bu işlevlerin kullanımını etkinleştirirsiniz.
Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod işlevi içeri aktarır
desc()
ve en yaygın adları ve bunların sayılarını azalan sırada görüntülemek için Apacheselect()
Spark yöntemini ve Apache SparkorderBy()
desc()
ve işlevlerini kullanır.Piton
from pyspark.sql.functions import desc display(df.select("First_Name", "Count").orderBy(desc("Count")))
Scala programlama dili
import org.apache.spark.sql.functions.desc display(df.select("First_Name", "Count").orderBy(desc("Count")))
R
display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
DataFrame alt kümesi oluşturma
Mevcut bir DataFrame'den bir dataframe alt kümesi oluşturmayı öğrenin.
Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, verileri yıla, sayıya ve cinsiyete göre kısıtlayan yeni bir DataFrame oluşturmak için Apache Spark
filter
yöntemini kullanır. Sütunları sınırlamak için Apache Sparkselect()
yöntemini kullanır. Ayrıca Apache SparkorderBy()
vedesc()
işlevlerini kullanarak yeni DataFrame'i sayıya göre sıralar.Piton
subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)
Scala programlama dili
val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)
R
subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count") display(subsetDF)
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
5. Adım: DataFrame'i kaydetme
DataFrame'i kaydetmeyi öğrenin. DataFrame'inizi bir tabloya kaydedebilir veya DataFrame'i bir dosyaya veya birden çok dosyaya yazabilirsiniz.
DataFrame'i tabloya kaydetme
Azure Databricks varsayılan olarak tüm tablolar için Delta Lake biçimini kullanır. DataFrame'inizi kaydetmek için katalogda ve şemada CREATE
tablo ayrıcalıklarına sahip olmanız gerekir.
Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, bu öğreticinin başında tanımladığınız değişkeni kullanarak DataFrame'in içeriğini bir tabloya kaydeder.
Piton
df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")
Scala programlama dili
df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")
R
saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
Apache Spark uygulamalarının çoğu büyük veri kümelerinde ve dağıtılmış bir şekilde çalışır. Apache Spark, tek bir dosya yerine dosyaların dizinini yazar. Delta Lake, Parquet klasörlerini ve dosyalarını böler. Birçok veri sistemi bu dosya dizinlerini okuyabilir. Azure Databricks, çoğu uygulama için dosya yolları üzerinden tabloların kullanılmasını önerir.
DataFrame'i JSON dosyalarına kaydetme
Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, DataFrame'i JSON dosyalarının dizinine kaydeder.
Piton
df.write.format("json").mode("overwrite").save("/tmp/json_data")
Scala programlama dili
df.write.format("json").mode("overwrite").save("/tmp/json_data")
R
write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
JSON dosyasından DataFrame'i okuma
Apache Spark spark.read.format()
yöntemini kullanarak bir dizinden DataFrame'e JSON verilerini okumayı öğrenin.
Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, önceki örnekte kaydettiğiniz JSON dosyalarını görüntüler.
Piton
display(spark.read.format("json").json("/tmp/json_data"))
Scala programlama dili
display(spark.read.format("json").json("/tmp/json_data"))
R
display(read.json("/tmp/json_data"))
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
Ek görevler: PySpark, Scala ve R'de SQL sorguları çalıştırma
Apache Spark DataFrames, SQL'i PySpark, Scala ve R ile birleştirmek için aşağıdaki seçenekleri sağlar. Aşağıdaki kodu bu öğretici için oluşturduğunuz not defterinde çalıştırabilirsiniz.
Sütunu SQL sorgusu olarak belirtme
Apache Spark selectExpr()
yöntemini kullanmayı öğrenin. Bu, SQL ifadelerini kabul eden ve güncelleştirilmiş bir DataFrame döndüren yöntemin bir değişkenidir select()
. Bu yöntem, gibi upper
bir SQL ifadesi kullanmanıza olanak tanır.
Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, bir dize sütununu büyük harfe dönüştürmek (ve sütunu yeniden adlandırmak) için Apache Spark
selectExpr()
yöntemini ve SQLupper
ifadesini kullanır.Piton
display(df.selectExpr("Count", "upper(County) as big_name"))
Scala programlama dili
display(df.selectExpr("Count", "upper(County) as big_name"))
R
display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
Bir sütunda SQL söz dizimini kullanmak için expr()
kullanın.
Bir sütunun belirtileceği her yerde SQL söz dizimini kullanmak için Apache Spark expr()
işlevini içeri aktarmayı ve kullanmayı öğrenin.
Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod
expr()
işlevini içeri aktarır ve apache Sparkexpr()
işlevini ve SQLlower
ifadesini kullanarak bir dize sütununu küçük harfe dönüştürür (ve sütunu yeniden adlandırır).Piton
from pyspark.sql.functions import expr display(df.select("Count", expr("lower(County) as little_name")))
Scala programlama dili
import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function display(df.select(col("Count"), expr("lower(County) as little_name")))
R
display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name")) # expr() function is not supported in R, selectExpr in SparkR replicates this functionality
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
spark.sql() işlevini kullanarak rastgele bir SQL sorgusu çalıştırma
Rastgele SQL sorguları çalıştırmak için Apache Spark spark.sql()
işlevini kullanmayı öğrenin.
Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, SQL söz dizimini kullanarak bir SQL tablosunu sorgulamak için Apache Spark
spark.sql()
işlevini kullanır.Piton
display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))
Scala programlama dili
display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))
R
display(sql(paste("SELECT * FROM", path_table, ".", table_name)))
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
DataFrame öğreticisi not defterleri
Aşağıdaki not defterleri bu öğreticideki örnek sorguları içerir.