Yaygın veri yükleme desenleri

Otomatik Yükleyici, bir dizi yaygın veri alımı görevini basitleştirir. Bu hızlı başvuru çeşitli popüler desenler için örnekler sağlar.

Bulut nesne depolama alanından değişken olarak veri alma

Otomatik Yükleyici, desteklenen dosya kaynaklarından tüm verileri hedef tabloda tek VARIANT bir sütun olarak yükleyebilir. VARIANT Şemaya ve tür değişikliklerine esnek olduğundan ve veri kaynağında mevcut olan büyük/küçük harf duyarlılığını ve NULL değerlerini koruduğundan, bu desen çoğu alım senaryosu için sağlamdır. Ayrıntılar için bkz. Bulut nesne depolamasından değişken olarak veri alma.

Glob desenlerini kullanarak dizinleri veya dosyaları filtreleme

Glob desenleri, dosya yolunda belirtildiğinde dizinleri ve dosyaları filtrelemek için kullanılabilir.

Desen Açıklama
? Herhangi bir tek karakterle eşleşir
* Sıfır veya daha çok sayıda karakterle eşleşir
[abc] {a,b,c} karakter kümesindeki tek bir karakterle eşleşir.
[a-z] {a…z} karakter aralığındaki tek bir karakterle eşleşir.
[^a] {a} karakter kümesinden veya aralıktan olmayan tek bir karakterle eşleşir. ^ karakterinin açılış köşeli ayracının hemen sağında yer alması gerektiğini unutmayın.
{ab,cd} {ab, cd} dize kümesindeki bir dizeyle eşleşir.
{ab,c{de, fh}} {ab, cde, cfh} dize kümesindeki bir dizeyle eşleşir.

path Önek desenleri sağlamak için kullanın, örneğin:

Piton

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", <format>) \
  .schema(schema) \
  .load("/Volumes/catalog_name/schema_name/volume_name/*/files")

Scala programlama dili

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", <format>)
  .schema(schema)
  .load("/Volumes/catalog_name/schema_name/volume_name/*/files")

Sonek desenlerini açıkça sağlamak için seçeneğini pathGlobFilter kullanmanız gerekir. Yalnızca path bir ön ek filtresi sağlar. Örneğin, yalnızca png farklı soneklere sahip dosyaları içeren bir dizindeki dosyaları ayrıştırmak istiyorsanız aşağıdakileri yapabilirsiniz:

Piton

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .option("pathGlobfilter", "*.png") \
  .load("/Volumes/catalog_name/schema_name/volume_name/path")

Scala programlama dili

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .option("pathGlobfilter", "*.png")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")

Not

Otomatik Yükleyici'nin varsayılan globbing davranışı, diğer Spark dosya kaynaklarının varsayılan davranışından farklıdır. Varsayılan Spark davranışıyla dosya kaynaklarına karşılık gelen globbing'i kullanmak için okuma işleminize .option("cloudFiles.useStrictGlobber", "true") ekleyin. Globbing hakkında daha fazla bilgi için aşağıdaki tabloya bakın:

Desen Dosya yolu Varsayılan globber Sıkı globber
/a/b /a/b/c/file.txt Evet Evet
/a/b /a/b_dir/c/file.txt Hayır Hayır
/a/b /a/b.txt Hayır Hayır
/a/b/ /a/b.txt Hayır Hayır
/a/*/c/ /a/b/c/file.txt Evet Evet
/a/*/c/ /a/b/c/d/file.txt Evet Evet
/a/*/c/ /a/b/x/y/c/file.txt Evet Hayır
/a/*/c /a/b/c_file.txt Evet Hayır
/a/*/c/ /a/b/c_file.txt Evet Hayır
/a/*/c/ /a/*/cookie/file.txt Evet Hayır
/a/b* /a/b.txt Evet Evet
/a/b* /a/b/file.txt Evet Evet
/a/{0.txt,1.txt} /a/0.txt Evet Evet
/a/*/{0.txt,1.txt} /a/0.txt Hayır Hayır
/a/b/[cde-h]/i/ /a/b/c/i/file.txt Evet Evet

Kolay ETL'yi etkinleştirme

Verilerinizi Delta Lake'e veri kaybetmeden getirmenin kolay bir yolu, aşağıdaki deseni kullanmak ve Otomatik Yükleyici ile şema çıkarımı sağlamaktır. Databricks, kaynak verilerinizin şeması değiştiğinde akışınızı otomatik olarak yeniden başlatması için bir Azure Databricks işinde aşağıdaki kodu çalıştırmanızı önerir. Varsayılan olarak, şema dize türleri olarak algılanır, ayrıştırma hataları (her şey dize olarak kalırsa hiçbiri olmamalıdır) _rescued_data'e gider ve yeni sütunlar akışı başarısızlığa uğratır ve şemayı geliştirecek şekilde evrim geçirir.

Piton

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
  .load("/Volumes/catalog_name/schema_name/volume_name/source_data") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target>")

Scala programlama dili

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("/Volumes/catalog_name/schema_name/volume_name/source_data")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target>")

İyi yapılandırılmış verilerde veri kaybını önleme

Şemanızı bildiğiniz halde beklenmeyen verileri yakalamak istediğinizde Databricks, kullanılmasını rescuedDataColumnönerir.

Piton

spark.readStream.format("cloudFiles") \
  .schema(expected_schema) \
  .option("cloudFiles.format", "json") \
  # will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue") \
  .load("/Volumes/catalog_name/schema_name/volume_name/source_data") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target>")

Scala programlama dili

spark.readStream.format("cloudFiles")
  .schema(expected_schema)
  .option("cloudFiles.format", "json")
  // will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load("/Volumes/catalog_name/schema_name/volume_name/source_data")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target>")

Şemanızla eşleşmeyen yeni bir alan kullanıma sunulduğunda akışınızın işlemeyi durdurmasını istiyorsanız şunları ekleyebilirsiniz:

.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")

Esnek yarı yapılandırılmış veri işlem hatlarını etkinleştirme

Bir satıcıdan sağladığı bilgilere yeni sütunlar ekleyen veriler aldığınızda, tam olarak ne zaman yaptıklarının farkında olmayabilir veya veri işlem hattınızı güncelleştirmek için bant genişliğine sahip olmayabilirsiniz. Artık akışı yeniden başlatmak ve Otomatik Yükleyici'nin çıkardığınız şemayı otomatik olarak güncelleştirmesini sağlamak için şema evrimini kullanabilirsiniz. Ayrıca, satıcının sağladığı "şemasız" alanlardan bazıları için schemaHints de kullanabilirsiniz.

Piton

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT") \
  .load("/Volumes/catalog_name/schema_name/volume_name/api/requests") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target>")

Scala programlama dili

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT")
  .load("/Volumes/catalog_name/schema_name/volume_name/api/requests")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target>")

İç içe JSON verilerini dönüştürme

Otomatik Yükleyici en üst düzey JSON sütunlarını dize olarak çıkardığından, daha fazla dönüştürme gerektiren iç içe JSON nesneleriyle baş başa kalabilirsiniz. Karmaşık JSON içeriğini daha da dönüştürmek için yarı yapılandırılmış veri erişim API'lerini kullanabilirsiniz.

Piton

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .load("/Volumes/catalog_name/schema_name/volume_name/nested_json") \
  .selectExpr(
    "*",
    "tags:page.name",    # extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"     # extracts {"tags":{"eventType":...}}
  )

Scala programlama dili

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("/Volumes/catalog_name/schema_name/volume_name/nested_json")
  .selectExpr(
    "*",
    "tags:page.name",     // extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int",  // extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"      // extracts {"tags":{"eventType":...}}
  )

İç içe JSON verilerini anlama

İç içe geçmiş verileriniz olduğunda, verilerinizin ve diğer sütun türlerinin iç içe yapısını çıkarsamak için cloudFiles.inferColumnTypes seçeneğini kullanabilirsiniz.

Piton

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .option("cloudFiles.inferColumnTypes", "true") \
  .load("/Volumes/catalog_name/schema_name/volume_name/nested_json")

Scala programlama dili

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .option("cloudFiles.inferColumnTypes", "true")
  .load("/Volumes/catalog_name/schema_name/volume_name/nested_json")

CSV dosyalarını üst bilgi olmadan yükleme

Aşağıdaki örnekte, Otomatik Yükleyici kullanılarak üst bilgi olmadan CSV dosyalarının nasıl yüklenecekleri gösterilmektedir. Sağlanan şemayla eşleşmeyen verileri yakalamak için kullanın rescuedDataColumn .

Piton

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("rescuedDataColumn", "_rescued_data") \ # ensure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala programlama dili

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Üst bilgi içeren CSV dosyalarında şemayı zorunlu kılma

Aşağıdaki örnek, üst bilgi içeren CSV dosyalarında şemanın nasıl uygulandığını açıklamaktadır. Sağlanan şemayla eşleşmeyen verileri yakalamak için kullanın rescuedDataColumn .

Piton

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("header", "true") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala programlama dili

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("header", "true")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Görüntü veya ikili verileri ML için Delta Lake'e alma

Veriler Delta Lake'te depolandıktan sonra veriler üzerinde dağıtılmış çıkarım çalıştırabilirsiniz. Bkz. Pandas UDF kullanarak dağıtılmış çıkarım gerçekleştirme.

Piton

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .load("/Volumes/catalog_name/schema_name/volume_name/images") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target>")

Scala programlama dili

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .load("/Volumes/catalog_name/schema_name/volume_name/images")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target>")

Lakeflow Spark Bildirimli İşlem Hatları için Otomatik Yükleyici sentaksı

Lakeflow Spark Bildirimli İşlem Hatları, Otomatik Yükleyici için biraz değiştirilmiş Python söz dizimi sağlar ve Otomatik Yükleyici için SQL desteği ekler. Aşağıdaki örneklerde Wanderbricks örnek seyahat rezervasyon veri kümesini kullanarak JSON dosyalarından veri kümeleri oluşturmak için Otomatik Yükleyici kullanılır:

Piton

@dp.table
def booking_updates():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .option("multiLine", "true")
      .load("/Volumes/my_catalog/my_schema/my_volume/wanderbricks/booking_updates")
  )

@dp.table
def reviews():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .option("multiLine", "true")
      .load("/Volumes/my_catalog/my_schema/my_volume/wanderbricks/reviews")
  )

SQL

CREATE OR REFRESH STREAMING TABLE booking_updates
AS SELECT * FROM STREAM read_files(
  "/Volumes/my_catalog/my_schema/my_volume/wanderbricks/booking_updates",
  format => "json",
  multiLine => true
)

CREATE OR REFRESH STREAMING TABLE reviews
AS SELECT * FROM STREAM read_files(
  "/Volumes/my_catalog/my_schema/my_volume/wanderbricks/reviews",
  format => "json",
  multiLine => true
)

Otomatik Yükleyici için desteklenen biçim seçeneklerini kullanabilirsiniz. read_files için seçenekler anahtar-değer çiftleridir. Desteklenen biçimler ve seçenekler hakkında ayrıntılı bilgi için bkz. Seçenekler.

CREATE OR REFRESH STREAMING TABLE my_table
AS SELECT *
  FROM STREAM read_files(
    "/Volumes/my_volume/path/to/files/*",
    option-key => option-value,
    ...
  )

Aşağıdaki örnek, sütun türü çıkarımı etkinleştirilmiş çok satırlı JSON dosyalarını okur:

CREATE OR REFRESH STREAMING TABLE booking_updates
AS SELECT * FROM STREAM read_files(
  "/Volumes/my_catalog/my_schema/my_volume/wanderbricks/booking_updates",
  format => "json",
  multiLine => true,
  inferColumnTypes => true
)

Verilen biçimi el ile belirtmek için schema kullanabilirsiniz; schema desteklemeyen biçimler için belirtmelisiniz.

Piton

@dp.table
def booking_updates_raw():
  return (
    spark.readStream.format("cloudFiles")
      .schema("booking_id LONG, booking_update_id LONG, user_id LONG, property_id LONG, status STRING, guests_count INT, total_amount DOUBLE, check_in DATE, check_out DATE, created_at TIMESTAMP, updated_at TIMESTAMP")
      .option("cloudFiles.format", "json")
      .option("multiLine", "true")
      .load("/Volumes/my_catalog/my_schema/my_volume/wanderbricks/booking_updates")
  )

SQL

CREATE OR REFRESH STREAMING TABLE booking_updates_raw
AS SELECT *
FROM STREAM read_files(
  "/Volumes/my_catalog/my_schema/my_volume/wanderbricks/booking_updates",
  format => "json",
  multiLine => true,
  schema => "booking_id LONG, booking_update_id LONG, user_id LONG, property_id LONG, status STRING, guests_count INT, total_amount DOUBLE, check_in DATE, check_out DATE, created_at TIMESTAMP, updated_at TIMESTAMP"
)

Not

Lakeflow Spark Bildirimli İşlem Hatları, dosyaları okumak için Otomatik Yükleyici kullanılırken şema ve denetim noktası dizinlerini otomatik olarak yapılandırır ve yönetir. Ancak, bu dizinlerden birini el ile yapılandırıyorsanız, tam yenileme gerçekleştirmek yapılandırılan dizinlerin içeriğini etkilemez. Databricks, işleme sırasında beklenmeyen yan etkileri önlemek için otomatik olarak yapılandırılmış dizinlerin kullanılmasını önerir.

Sonraki Adımlar