Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Bu makalede, PySpark kullanımını göstermek için basit örnekler gösterilmektedir. Temel Apache Spark kavramlarını anladığınızı ve işleme bağlı bir Azure Databricks not defterinde komut çalıştırdığınızı varsayar. Örnek verileri kullanarak DataFrame'ler oluşturur, bu verilerde satır ve sütun işlemleri dahil temel dönüştürmeler gerçekleştirir, birden çok DataFrame'i birleştirir ve bu verileri toplar, bu verileri görselleştirir ve sonra bir tabloya veya dosyaya kaydedersiniz.
Veri yükle
Bu makaledeki bazı örneklerde Databricks tarafından sağlanan örnek veriler kullanılarak DataFrame'lerin veri yükleme, dönüştürme ve kaydetme işlemleri gösterilmektedir. Henüz Databricks'te olmayan kendi verilerinizi kullanmak istiyorsanız, önce bu verileri karşıya yükleyebilir ve bu veriden bir DataFrame oluşturabilirsiniz. Bkz. Dosya yükleme ve Dosyaları Unity Kataloğu birimine yükleme kullanarak tablo oluşturma veya değiştirme.
Databricks örnek verileri hakkında
Databricks, katalogda samples ve dizinde /databricks-datasets örnek veriler sağlar.
- Katalogdaki
samplesörnek verilere erişmek için biçiminisamples.<schema-name>.<table-name>kullanın. Bu makalede, şemadakisamples.tpchtablolar kullanılır ve bunlar kurgusal bir işletmeden alınan verileri içerir. Tablodacustomermüşteriler hakkında bilgiler veordersbu müşteriler tarafından verilen siparişler hakkında bilgiler yer alır. - içindeki
dbutils.fs.lsverileri keşfetmek için kullanın/databricks-datasets. Dosya yollarını kullanarak bu konumdaki verileri sorgulamak için Spark SQL veya DataFrames kullanın. Databricks tarafından sağlanan örnek veriler hakkında daha fazla bilgi edinmek için bkz. Örnek veri kümeleri.
Veri türlerini içeri aktarma
Birçok PySpark işlemi, SQL işlevlerini kullanmanızı veya yerel Spark türleriyle etkileşim kurmanızı gerektirir. Yalnızca ihtiyacınız olan işlevleri ve türleri doğrudan içeri aktarın veya Python yerleşik işlevlerini geçersiz kılmamak için ortak bir diğer ad kullanarak bu modülleri içeri aktarın.
# import select functions and types
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import floor, round
# import modules using an alias
import pyspark.sql.types as T
import pyspark.sql.functions as F
Veri türlerinin kapsamlı bir listesi için bkz. PySpark Veri Türleri.
PySpark SQL işlevlerinin kapsamlı listesi için bkz. PySpark İşlevleri.
DataFrame oluşturma
DataFrame oluşturmanın birkaç yolu vardır. Genellikle tablo veya dosya koleksiyonu gibi bir veri kaynağında DataFrame tanımlarsınız. Ardından Apache Spark temel kavramları bölümünde açıklandığı gibi, yürütülecek dönüştürmeleri tetikleme amacıyla gibi displaybir eylem kullanın. yöntemi DataFrame'ler display üretir.
Belirtilen değerlerle DataFrame oluşturma
Belirtilen değerlere sahip bir DataFrame oluşturmak için, satırların createDataFrame tanımlama grubu listesi olarak ifade edildiği yöntemini kullanın:
df_children = spark.createDataFrame(
data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
schema = ['name', 'age'])
display(df_children)
Çıkışta sütunlarının df_children veri türlerinin otomatik olarak çıkarıldığını fark edin. Alternatif olarak, bir şema ekleyerek türleri belirtebilirsiniz. Şemalar, adı, veri türünü ve null değer içerip içermediklerini belirten boole bayrağından oluşan StructType kullanılarak tanımlanırStructFields. veri türlerini uygulamasından pyspark.sql.typesiçeri aktarmanız gerekir.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
df_children_with_schema = spark.createDataFrame(
data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
schema = StructType([
StructField('name', StringType(), True),
StructField('age', IntegerType(), True)
])
)
display(df_children_with_schema)
Unity Kataloğu'ndaki bir tablodan DataFrame oluşturma
Unity Kataloğu'ndaki bir tablodan DataFrame oluşturmak için biçimini tablekullanarak tabloyu tanımlayan yöntemini kullanın<catalog-name>.<schema-name>.<table-name>. Tablonuza gitmek için Katalog Gezgini'ni kullanmak için sol gezinti çubuğunda Katalog'a tıklayın. Tablo yolunu not defterine eklemek için tablo yolunu kopyala'yı seçin.
Aşağıdaki örnek, tablosunu samples.tpch.customeryükler, ancak alternatif olarak kendi tablonuzun yolunu sağlayabilirsiniz.
df_customer = spark.table('samples.tpch.customer')
display(df_customer)
Karşıya yüklenen bir dosyadan DataFrame oluşturma
Unity Kataloğu birimlerine yüklediğiniz bir dosyadan DataFrame oluşturmak için özelliğini kullanın read . Bu yöntem, uygun biçimi okumak için kullanabileceğiniz bir DataFrameReaderdöndürür. Soldaki küçük kenar çubuğundaki katalog seçeneğine tıklayın ve dosyanızı bulmak için katalog tarayıcısını kullanın. Seçin ve ardından Birim dosya yolunu kopyala'ya tıklayın.
Aşağıdaki örnek bir *.csv dosyadan okunur, ancak DataFrameReader dosyaları başka birçok biçimde karşıya yüklemeyi destekler. Bkz. DataFrameReader yöntemleri.
# Assign this variable your full volume file path
volume_file_path = ""
df_csv = (spark.read
.format("csv")
.option("header", True)
.option("inferSchema", True)
.load(volume_file_path)
)
display(df_csv)
Unity Kataloğu birimleri hakkında daha fazla bilgi için bkz . Unity Kataloğu birimleri nedir?.
JSON yanıtından DataFrame oluşturma
REST API tarafından döndürülen bir JSON yanıt yükünden DataFrame oluşturmak için Python paketini kullanarak requests yanıtı sorgulayıp ayrıştırın. Kullanmak için paketi içeri aktarmanız gerekir. Bu örnek, Amerika Birleşik Devletleri Gıda ve İlaç Dairesi'nin ilaç uygulama veritabanındaki verileri kullanır.
import requests
# Download data from URL
url = "https://api.fda.gov/drug/drugsfda.json?limit=100"
response = requests.get(url)
# Create the DataFrame
df_drugs = spark.createDataFrame(response.json()["results"])
display(df_drugs)
Databricks'te JSON ve diğer yarı yapılandırılmış verilerle çalışma hakkında bilgi için bkz . Yarı yapılandırılmış verileri modelleme.
JSON alanı veya nesnesi seçme
Dönüştürülen JSON'dan belirli bir alanı veya nesneyi seçmek için gösterimi kullanın [] . Örneğin, bir ürün dizisi olan alanı seçmek products için:
display(df_drugs.select(df_drugs["products"]))
Ayrıca, birden çok alanda çapraz geçiş yapmak için yöntem çağrılarını birbirine zincirleyebilirsiniz. Örneğin, bir ilaç uygulamasındaki ilk ürünün marka adını çıkarmak için:
display(df_drugs.select(df_drugs["products"][0]["brand_name"]))
Bir dosyadan DataFrame oluşturma
Bir dosyadan DataFrame oluşturmayı göstermek için bu örnek dizindeki /databricks-datasets CSV verilerini yükler.
Örnek veri kümelerine gitmek için Databricks Utilties dosya sistemi komutlarını kullanabilirsiniz. Aşağıdaki örnek, içinde kullanılabilen veri kümelerini listelemek dbutilsiçin kullanır/databricks-datasets:
display(dbutils.fs.ls('/databricks-datasets'))
Alternatif olarak, aşağıdaki örnekte gösterildiği gibi Databricks CLI dosya sistemi komutlarına%fsiçin kullanabilirsiniz:
%fs ls '/databricks-datasets'
Bir dosyadan veya dosya dizininden DataFrame oluşturmak için yönteminde load yolu belirtin:
df_population = (spark.read
.format("csv")
.option("header", True)
.option("inferSchema", True)
.load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)
display(df_population)
DataFrame'lerle verileri dönüştürme
DataFrame'ler, verileri sıralamak, filtrelemek ve toplamak için yerleşik yöntemleri kullanarak verileri dönüştürmeyi kolaylaştırır. Birçok dönüştürme DataFrame'lerde yöntem olarak belirtilmez, bunun yerine pakette pyspark.sql.functions sağlanır. Bkz. Databricks PySpark SQL İşlevleri.
Sütun işlemleri
Spark birçok temel sütun işlemi sağlar:
Tip
DataFrame'deki tüm sütunların çıktısını almak için kullanın, columnsörneğin df_customer.columns.
Sütunları seç
ve selectkullanarak col belirli sütunları seçebilirsiniz.
col işlevi alt modüldedirpyspark.sql.functions.
from pyspark.sql.functions import col
df_customer.select(
col("c_custkey"),
col("c_acctbal")
)
Dize olarak tanımlanan bir ifadeyi alan bir sütuna expr da başvurabilirsiniz:
from pyspark.sql.functions import expr
df_customer.select(
expr("c_custkey"),
expr("c_acctbal")
)
SQL ifadelerini kabul eden öğesini de kullanabilirsiniz selectExpr:
df_customer.selectExpr(
"c_custkey as key",
"round(c_acctbal) as account_rounded"
)
Dize değişmez değeri kullanarak sütunları seçmek için aşağıdakileri yapın:
df_customer.select(
"c_custkey",
"c_acctbal"
)
Belirli bir DataFrame'den bir sütunu açıkça seçmek için işlecini [] veya işlecini . kullanabilirsiniz. (İşleç . , tamsayı ile başlayan sütunları veya boşluk ya da özel karakter içeren sütunları seçmek için kullanılamaz.) Bu, özellikle bazı sütunların aynı ada sahip olduğu DataFrame'lere katılırken yararlı olabilir.
df_customer.select(
df_customer["c_custkey"],
df_customer["c_acctbal"]
)
df_customer.select(
df_customer.c_custkey,
df_customer.c_acctbal
)
Sütunlar oluştur
Yeni bir sütun oluşturmak için yöntemini kullanın withColumn . Aşağıdaki örnek, müşteri hesabı bakiyesinin c_acctbal değerini aşıp aşmadığına 1000bağlı olarak boole değeri içeren yeni bir sütun oluşturur:
df_customer_flag = df_customer.withColumn("balance_flag", col("c_acctbal") > 1000)
Sütunları yeniden adlandırma
Bir sütunu yeniden adlandırmak için mevcut ve yeni sütun adlarını kabul eden yöntemini kullanın withColumnRenamed :
df_customer_flag_renamed = df_customer_flag.withColumnRenamed("balance_flag", "balance_flag_renamed")
Yöntemi alias özellikle sütunlarınızı toplamaların bir parçası olarak yeniden adlandırmak istediğinizde yararlıdır:
from pyspark.sql.functions import avg
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
avg(df_customer["c_acctbal"]).alias("avg_account_balance")
)
display(df_segment_balance)
Sütun türlerini atama
Bazı durumlarda DataFrame'inizdeki bir veya daha fazla sütunun veri türünü değiştirmek isteyebilirsiniz. Bunu yapmak için yöntemini kullanarak cast sütun veri türleri arasında dönüştürme yapın. Aşağıdaki örnekte, bir sütuna başvurmak için yöntemini kullanarak col bir sütunun tamsayıdan dize türüne nasıl dönüştürüldüğü gösterilmektedir:
from pyspark.sql.functions import col
df_casted = df_customer.withColumn("c_custkey", col("c_custkey").cast(StringType()))
print(type(df_casted))
Sütunları kaldır
Sütunları kaldırmak için, seçme sırasında sütunları atlayabilir veya select(*) except yöntemini kullanabilirsiniz drop :
df_customer_flag_renamed.drop("balance_flag_renamed")
Aynı anda birden çok sütun da bırakabilirsiniz:
df_customer_flag_renamed.drop("c_phone", "balance_flag_renamed")
Satır işlemleri
Spark birçok temel satır işlemi sağlar:
- Satırları filtrele
- Yinelenen satırları kaldırma
- Null değerleri işleme
- Satırları ekleme
- Satırları sıralama
- Satırları filtrele
Satırları filtreleme
Satırları filtrelemek için DataFrame'de veya filter yöntemini kullanarak where yalnızca belirli satırları döndürebilirsiniz. Filtre uygulanacak bir sütunu tanımlamak için yöntemini veya sütunu değerlendiren bir ifadeyi kullanın col .
from pyspark.sql.functions import col
df_that_one_customer = df_customer.filter(col("c_custkey") == 412449)
Birden çok koşula göre filtrelemek için mantıksal işleçler kullanın. Örneğin, & sırasıyla | ve AND koşullarına OR izin vermenizi sağlar. Aşağıdaki örnek, değerine eşit c_nationkey ve 20 değerinden c_acctbal1000büyük olan satırları filtreler.
df_customer.filter((col("c_nationkey") == 20) & (col("c_acctbal") > 1000))
df_filtered_customer = df_customer.filter((col("c_custkey") == 412446) | (col("c_custkey") == 412447))
Yinelenen satırları kaldırma
Satırların yinelenenlerini silmek için yalnızca distinctbenzersiz satırları döndüren kullanın.
df_unique = df_customer.distinct()
Null değerleri işleme
Null değerleri işlemek için yöntemini kullanarak na.drop null değerler içeren satırları bırakın. Bu yöntem, null değerler veya any null değerler içeren all satırları bırakmak isteyip istemediğinizi belirtmenize olanak tanır.
Null değerleri bırakmak için aşağıdaki örneklerden birini kullanın.
df_customer_no_nulls = df_customer.na.drop()
df_customer_no_nulls = df_customer.na.drop("any")
Bunun yerine yalnızca tüm null değerleri içeren satırları filtrelemek istiyorsanız aşağıdakileri kullanın:
df_customer_no_nulls = df_customer.na.drop("all")
Aşağıda gösterildiği gibi bunu belirterek sütunların bir alt kümesi için uygulayabilirsiniz:
df_customer_no_nulls = df_customer.na.drop("all", subset=["c_acctbal", "c_custkey"])
Eksik değerleri doldurmak için yöntemini kullanın fill . Bunu tüm sütunlara veya sütunların bir alt kümesine uygulamayı seçebilirsiniz. Aşağıdaki örnekte, hesap bakiyesi için null değere sahip hesap bakiyeleri c_acctbal ile 0doldurulur.
df_customer_filled = df_customer.na.fill("0", subset=["c_acctbal"])
Dizeleri diğer değerlerle değiştirmek için yöntemini kullanın replace . Aşağıdaki örnekte, tüm boş adres dizeleri sözcüğüyle UNKNOWNdeğiştirilir:
df_customer_phone_filled = df_customer.na.replace([""], ["UNKNOWN"], subset=["c_phone"])
Satırları ekle
Satırları eklemek için yöntemini kullanarak union yeni bir DataFrame oluşturmanız gerekir. Aşağıdaki örnekte, daha önce oluşturulan ve df_that_one_customer birleştirilen DataFramedf_filtered_customer, üç müşteriyle bir DataFrame döndürür:
df_appended_rows = df_that_one_customer.union(df_filtered_customer)
display(df_appended_rows)
Note
DataFrame'leri bir tabloya yazıp yeni satırlar ekleyerek de birleştirebilirsiniz. Üretim iş yükleri için veri kaynaklarının hedef tabloya artımlı olarak işlenmesi, verilerin boyutu büyüdükçe gecikme süresini ve işlem maliyetlerini önemli ölçüde azaltabilir. Bkz. Lakeflow Connect'te standart bağlayıcılar.
Satırları sıralama
Important
Sıralama büyük ölçekte pahalı olabilir ve sıralanmış verileri depolar ve Spark ile verileri yeniden yüklerseniz sıralama garanti edilmemektedir. Sıralama kullanımınızda kasıtlı olarak kullandığınızdan emin olun.
Satırları bir veya daha fazla sütuna göre sıralamak için veya sort yöntemini kullanınorderBy. Varsayılan olarak bu yöntemler artan düzende sıralanır:
df_customer.orderBy(col("c_acctbal"))
Azalan sırada filtrelemek için kullanın desc:
df_customer.sort(col("c_custkey").desc())
Aşağıdaki örnekte iki sütuna göre sıralama gösterilmektedir:
df_sorted = df_customer.orderBy(col("c_acctbal").desc(), col("c_custkey").asc())
df_sorted = df_customer.sort(col("c_acctbal").desc(), col("c_custkey").asc())
DataFrame sıralandıktan sonra döndürülecek satır sayısını sınırlamak için yöntemini kullanın limit . Aşağıdaki örnek yalnızca en iyi 10 sonuçları görüntüler:
display(df_sorted.limit(10))
DataFrame'leri Birleştirme
İki veya daha fazla DataFrame'i birleştirmek için yöntemini kullanın join . DataFrame'lerin (birleştirme türü) ve how (birleştirmenin temel alınacağı sütunlar) parametrelerinde on nasıl birleştirileceğini belirtebilirsiniz. Ortak birleştirme türleri şunlardır:
-
inner: Bu, yalnızca DataFrame'ler genelinde parametre içinoneşleşme olan satırları tutan bir DataFrame döndüren birleştirme türü varsayılandır. -
left: Bu, ilk belirtilen DataFrame'in tüm satırlarını ve yalnızca ikinci belirtilen DataFrame'den birinciyle eşleşen satırları tutar. -
outer: Dış birleşim, eşleşmeden bağımsız olarak her iki DataFrame'den tüm satırları korur.
Birleştirmeler hakkında ayrıntılı bilgi için bkz . Azure Databricks'te birleştirmelerle çalışma. PySpark'ta desteklenen birleştirmelerin listesi için bkz. DataFrame birleşimleri.
Aşağıdaki örnek, DataFrame'in her satırının orders DataFrame'den customers karşılık gelen satırla birleştirildiği tek bir DataFrame döndürür. İç birleşim kullanılır, beklenti her siparişin tam olarak bir müşteriye karşılık geliyor olmasıdır.
df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')
df_joined = df_order.join(
df_customer,
on = df_order["o_custkey"] == df_customer["c_custkey"],
how = "inner"
)
display(df_joined)
Birden çok koşulda birleştirmek için ve gibi &| boole işleçlerini kullanarak sırasıyla ve ANDbelirtinOR. Aşağıdaki örnek, yalnızca değerinden o_totalpricebüyük satırları 500,000 filtrelemek için ek bir koşul ekler:
df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')
df_complex_joined = df_order.join(
df_customer,
on = ((df_order["o_custkey"] == df_customer["c_custkey"]) & (df_order["o_totalprice"] > 500000)),
how = "inner"
)
display(df_complex_joined)
Verileri toplama
Sql'deki gibi GROUP BY bir DataFrame'deki verileri toplamak için yöntemini kullanarak groupBy gruplandırılacak sütunları ve agg toplamaları belirtme yöntemini belirtin. , , avgsumve max gibi minyaygın toplamaları uygulamasından pyspark.sql.functionsiçeri aktarabilirsiniz. Aşağıdaki örnekte pazar segmentlerine göre ortalama müşteri bakiyesi gösterilmektedir:
from pyspark.sql.functions import avg
# group by one column
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
avg(df_customer["c_acctbal"])
)
display(df_segment_balance)
from pyspark.sql.functions import avg
# group by two columns
df_segment_nation_balance = df_customer.groupBy("c_mktsegment", "c_nationkey").agg(
avg(df_customer["c_acctbal"])
)
display(df_segment_nation_balance)
Bazı toplamalar eylemlerdir ve bu da hesaplamaları tetikledikleri anlamına gelir. Bu durumda, sonuçların çıktısını almak için başka eylemler kullanmanız gerekmez.
DataFrame'deki satırları saymak için yöntemini kullanın count :
df_customer.count()
Çağrıları zincirleme
DataFrame'leri dönüştüren yöntemler DataFrame'ler döndürür ve Spark, eylemler çağrılana kadar dönüştürmeler üzerinde işlem yapmaz. Bu tembel değerlendirme kolaylık ve okunabilirlik için birden çok yöntemi zincirleyebileceğiniz anlamına gelir. Aşağıdaki örnekte filtreleme, toplama ve sıralama zincirleme gösterilmektedir:
from pyspark.sql.functions import count
df_chained = (
df_order.filter(col("o_orderstatus") == "F")
.groupBy(col("o_orderpriority"))
.agg(count(col("o_orderkey")).alias("n_orders"))
.sort(col("n_orders").desc())
)
display(df_chained)
DataFrame'inizi görselleştirme
Not defterindeki bir DataFrame'i görselleştirmek için, DataFrame'in sol üst kısmındaki tablonun yanındaki işaretine tıklayın + ve ardından DataFrame'inize göre bir veya daha fazla grafik eklemek için Görselleştirme'yi seçin. Görselleştirmelerle ilgili ayrıntılar için bkz. Databricks not defterlerindeki görselleştirmeler ve SQL düzenleyicisi.
display(df_order)
Databricks, ek görselleştirmeler gerçekleştirmek için Spark için pandas API'sini kullanmanızı önerir. , .pandas_api() Spark DataFrame için karşılık gelen pandas API'sine atamanızı sağlar. Daha fazla bilgi için bkz . Spark üzerinde Pandas API'si.
Verilerinizi kaydetme
Verilerinizi dönüştürdükten sonra, yöntemlerini kullanarak DataFrameWriter kaydedebilirsiniz. Bu yöntemlerin tam listesi DataFrameWriter'da bulunabilir. Aşağıdaki bölümlerde DataFrame'inizi tablo olarak ve veri dosyaları koleksiyonu olarak kaydetme gösterilmektedir.
DataFrame'inizi tablo olarak kaydetme
DataFrame'inizi Unity Kataloğu'nda tablo olarak kaydetmek için yöntemini kullanın write.saveAsTable ve yolu biçiminde <catalog-name>.<schema-name>.<table-name>belirtin.
df_joined.write.saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")
DataFrame'inizi CSV olarak yazma
DataFrame'inizi biçimlendirecek şekilde yazmak için *.csv biçimini ve seçeneklerini belirterek yöntemini kullanın write.csv . Belirtilen yolda veriler varsa, varsayılan olarak yazma işlemi başarısız olur. Farklı bir işlem yapmak için aşağıdaki modlardan birini belirtebilirsiniz:
-
overwriteDataFrame içeriğiyle hedef yolda var olan tüm verilerin üzerine yazar. -
appendDataFrame'in içeriğini hedef yoldaki verilere ekler. -
ignorehedef yolda veriler varsa yazma işlemi sessizce başarısız olur.
Aşağıdaki örnek, DataFrame içeriğiyle verilerin üzerine CSV dosyaları olarak yazma işlemini gösterir:
# Assign this variable your file path
file_path = ""
(df_joined.write
.format("csv")
.mode("overwrite")
.write(file_path)
)
Sonraki Adımlar
Databricks'te daha fazla Spark özelliğinden yararlanmak için bkz: