Yaygın veri yükleme desenleri

Otomatik Yükleyici, bir dizi yaygın veri alımı görevini basitleştirir. Bu hızlı başvuru çeşitli popüler desenler için örnekler sağlar.

Glob desenlerini kullanarak dizinleri veya dosyaları filtreleme

Glob desenleri, yolda sağlandığında dizinleri ve dosyaları filtrelemek için kullanılabilir.

Desen Açıklama
? Herhangi bir tek karakterle eşleşir
* Sıfır veya daha çok sayıda karakterle eşleşir
[abc] {a,b,c} karakter kümesindeki tek bir karakterle eşleşir.
[a-z] {a... karakter aralığındaki tek bir karakterle eşleşir. z}.
[^a] {a} karakter kümesinden veya aralıktan olmayan tek bir karakterle eşleşir. Karakterin ^ , sol köşeli ayraç sağda hemen yer alması gerektiğini unutmayın.
{ab,cd} {ab, cd} dize kümesindeki bir dizeyle eşleşir.
{ab,c{de, fh}} {ab, cde, cfh} dize kümesindeki bir dizeyle eşleşir.

path Ön ek desenleri sağlamak için kullanın, örneğin:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", <format>) \
  .schema(schema) \
  .load("<base_path>/*/files")

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", <format>)
  .schema(schema)
  .load("<base_path>/*/files")

Önemli

Sonek desenlerini açıkça sağlamak için seçeneğini pathGlobFilter kullanmanız gerekir. Yalnızca path bir ön ek filtresi sağlar.

Örneğin, yalnızca png farklı soneklere sahip dosyalar içeren bir dizindeki dosyaları ayrıştırmak isterseniz şunları yapabilirsiniz:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .option("pathGlobfilter", "*.png") \
  .load(<base_path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .option("pathGlobfilter", "*.png")
  .load(<base_path>)

Kolay ETL'yi etkinleştirme

Verilerinizi kaybetmeden Delta Lake'e almanın kolay bir yolu, aşağıdaki deseni kullanmak ve Otomatik Yükleyici ile şema çıkarımı sağlamaktır. Databricks, kaynak verilerinizin şeması değiştiğinde akışınızı otomatik olarak yeniden başlatması için bir Azure Databricks işinde aşağıdaki kodu çalıştırmanızı önerir. Varsayılan olarak, şema dize türleri olarak çıkarsanır, ayrıştırma hataları (her şey bir dize olarak kalırsa hiçbiri olmamalıdır) öğesine _rescued_datagider ve yeni sütunlar akışta başarısız olur ve şemayı geliştirer.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.schemaLocation", "<path_to_schema_location>") \
  .load("<path_to_source_data>") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path_to_checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path_to_schema_location>")
  .load("<path_to_source_data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path_to_checkpoint>")
  .start("<path_to_target")

İyi yapılandırılmış verilerde veri kaybını önleme

Şemanızı bildiğinizde ancak beklenmeyen veriler aldığınızda bilmek istediğinizde, Databricks komutunu kullanmanızı rescuedDataColumnönerir.

Python

spark.readStream.format("cloudFiles") \
  .schema(expected_schema) \
  .option("cloudFiles.format", "json") \
  # will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue") \
  .load("<path_to_source_data>") \
  .writeStream \
  .option("checkpointLocation", "<path_to_checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .schema(expected_schema)
  .option("cloudFiles.format", "json")
  // will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load("<path_to_source_data>")
  .writeStream
  .option("checkpointLocation", "<path_to_checkpoint>")
  .start("<path_to_target")

Şemanızla eşleşmeyen yeni bir alan kullanıma sunulduğunda akışınızın işlemeyi durdurmasını istiyorsanız şunları ekleyebilirsiniz:

.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")

Esnek yarı yapılandırılmış veri işlem hatlarını etkinleştirme

Sağladığı bilgilere yeni sütunlar getiren bir satıcıdan veri aldığınızda, tam olarak ne zaman yaptıklarının farkında olmayabilirsiniz veya veri işlem hattınızı güncelleştirmek için bant genişliğine sahip olmayabilirsiniz. Artık akışı yeniden başlatmak ve Otomatik Yükleyici'nin çıkarsanan şemayı otomatik olarak güncelleştirmesine izin vermek için şema evrimini kullanabilirsiniz. Ayrıca, satıcının sağladığı "şemasız" alanlardan bazıları için de kullanabilirsiniz schemaHints .

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT") \
  .load("/api/requests") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path_to_checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT")
  .load("/api/requests")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path_to_checkpoint>")
  .start("<path_to_target")

İç içe JSON verilerini dönüştürme

Otomatik Yükleyici en üst düzey JSON sütunlarını dize olarak çıkarsadığından, daha fazla dönüştürme gerektiren iç içe JSON nesneleriyle kalabilirsiniz. Karmaşık JSON içeriğini daha fazla dönüştürmek için yarı yapılandırılmış veri erişim API'lerini kullanabilirsiniz.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path_to_checkpoint>") \
  .load("<source_data_with_nested_json>") \
  .selectExpr(
    "*",
    "tags:page.name",    # extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"     # extracts {"tags":{"eventType":...}}
  )

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path_to_checkpoint>")
  .load("<source_data_with_nested_json>")
  .selectExpr(
    "*",
    "tags:page.name",     // extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int",  // extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"      // extracts {"tags":{"eventType":...}}
  )

İç içe JSON verilerini çıkarsama

İç içe veriniz olduğunda, verilerinizin cloudFiles.inferColumnTypes ve diğer sütun türlerinin iç içe yapısını çıkarsamak için seçeneğini kullanabilirsiniz.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path_to_checkpoint>") \
  .option("cloudFiles.inferColumnTypes", "true") \
  .load("<source_data_with_nested_json>")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path_to_checkpoint>")
  .option("cloudFiles.inferColumnTypes", "true")
  .load("<source_data_with_nested_json>")

CSV dosyalarını üst bilgi olmadan yükleme

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Üst bilgi içeren CSV dosyalarında şemayı zorunlu kılma

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("header", "true") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("header", "true")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

ML için Delta Lake'e görüntü veya ikili veri alma

Veriler Delta Lake'te depolandıktan sonra veriler üzerinde dağıtılmış çıkarım çalıştırabilirsiniz. Bkz . Pandas UDF kullanarak dağıtılmış çıkarım gerçekleştirme.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .load("<path_to_source_data>") \
  .writeStream \
  .option("checkpointLocation", "<path_to_checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .load("<path_to_source_data>")
  .writeStream
  .option("checkpointLocation", "<path_to_checkpoint>")
  .start("<path_to_target")