Yaygın veri yükleme desenleri

Otomatik Yükleyici, bir dizi yaygın veri alımı görevini basitleştirir. Bu hızlı başvuru çeşitli popüler desenler için örnekler sağlar.

Glob desenlerini kullanarak dizinleri veya dosyaları filtreleme

Glob desenleri, yolda sağlandığında dizinleri ve dosyaları filtrelemek için kullanılabilir.

Desen Tanım
? Herhangi bir tek karakterle eşleşir
* Sıfır veya daha çok sayıda karakterle eşleşir
[abc] {a,b,c} karakter kümesindeki tek bir karakterle eşleşir.
[a-z] {a... karakter aralığındaki tek bir karakterle eşleşir. z}.
[^a] {a} karakter kümesinden veya aralıktan olmayan tek bir karakterle eşleşir. Karakterin ^ , açılış köşeli ayraçlarının hemen sağ kısmında yer alması gerektiğini unutmayın.
{ab,cd} {ab, cd} dize kümesindeki bir dizeyle eşleşir.
{ab,c{de, fh}} {ab, cde, cfh} dize kümesindeki bir dizeyle eşleşir.

path Ön ek desenleri sağlamak için kullanın, örneğin:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", <format>) \
  .schema(schema) \
  .load("<base-path>/*/files")

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", <format>)
  .schema(schema)
  .load("<base-path>/*/files")

Önemli

Sonek desenlerini açıkça sağlamak için seçeneğini pathGlobFilter kullanmanız gerekir. Yalnızca path bir ön ek filtresi sağlar.

Örneğin, yalnızca png farklı soneklere sahip dosyaları içeren bir dizindeki dosyaları ayrıştırmak isterseniz şunları yapabilirsiniz:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .option("pathGlobfilter", "*.png") \
  .load(<base-path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .option("pathGlobfilter", "*.png")
  .load(<base-path>)

Dekont

Otomatik Yükleyici'nin varsayılan globbing davranışı, diğer Spark dosya kaynaklarının varsayılan davranışından farklıdır. Dosya kaynaklarında varsayılan Spark davranışıyla eşleşen globbing'i kullanmak için okumanıza ekleyin .option("cloudFiles.useStrictGlobber", "true") . Globbing hakkında daha fazla bilgi için aşağıdaki tabloya bakın:

Desen Dosya yolu Varsayılan globber Sıkı globber
/a/b /a/b/c/file.txt Evet Evet
/a/b /a/b_dir/c/file.txt Hayır Hayır
/a/b /a/b.txt Hayır Hayır
/a/b/ /a/b.txt Hayır Hayır
/a/*/c/ /a/b/c/file.txt Evet Evet
/a/*/c/ /a/b/c/d/file.txt Evet Evet
/a/*/c/ /a/b/x/y/c/file.txt Evet Hayır
/a/*/c /a/b/c_file.txt Evet Hayır
/a/*/c/ /a/b/c_file.txt Evet Hayır
/a/*/c/ /a/*/cookie/file.txt Evet Hayır
/a/b* /a/b.txt Evet Evet
/a/b* /a/b/file.txt Evet Evet
/a/{0.txt,1.txt} /a/0.txt Evet Evet
/a/*/{0.txt,1.txt} /a/0.txt Hayır Hayır
/a/b/[cde-h]/i/ /a/b/c/i/file.txt Evet Evet

Kolay ETL'yi etkinleştirme

Verilerinizi Delta Lake'e veri kaybetmeden getirmenin kolay bir yolu, aşağıdaki deseni kullanmak ve Otomatik Yükleyici ile şema çıkarımı sağlamaktır. Databricks, kaynak verilerinizin şeması değiştiğinde akışınızı otomatik olarak yeniden başlatması için bir Azure Databricks işinde aşağıdaki kodu çalıştırmanızı önerir. Varsayılan olarak, şema dize türleri olarak çıkarılır, ayrıştırma hataları (her şey dize olarak kalırsa hiçbiri olmamalıdır) öğesine _rescued_datagider ve yeni sütunlar akışı başarısız olur ve şemayı geliştirebilir.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

İyi yapılandırılmış verilerde veri kaybını önleme

Şemanızı bildiğinizde ancak beklenmeyen veriler aldığınızda bilmek istediğinizde, Databricks komutunu rescuedDataColumnkullanmanızı önerir.

Python

spark.readStream.format("cloudFiles") \
  .schema(expected_schema) \
  .option("cloudFiles.format", "json") \
  # will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .schema(expected_schema)
  .option("cloudFiles.format", "json")
  // will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Şemanızla eşleşmeyen yeni bir alan kullanıma sunulduğunda akışınızın işlemeyi durdurmasını istiyorsanız şunları ekleyebilirsiniz:

.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")

Esnek yarı yapılandırılmış veri işlem hatlarını etkinleştirme

Bir satıcıdan sağladığı bilgilere yeni sütunlar ekleyen veriler aldığınızda, tam olarak ne zaman yaptıklarının farkında olmayabilir veya veri işlem hattınızı güncelleştirmek için bant genişliğine sahip olmayabilirsiniz. Artık akışı yeniden başlatmak ve Otomatik Yükleyici'nin çıkardığınız şemayı otomatik olarak güncelleştirmesini sağlamak için şema evrimini kullanabilirsiniz. Satıcının sağladığı "şemasız" alanlardan bazıları için de kullanabilirsiniz schemaHints .

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT") \
  .load("/api/requests") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT")
  .load("/api/requests")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

İç içe JSON verilerini dönüştürme

Otomatik Yükleyici en üst düzey JSON sütunlarını dize olarak çıkardığından, daha fazla dönüştürme gerektiren iç içe JSON nesneleriyle baş başa kalabilirsiniz. Karmaşık JSON içeriğini daha da dönüştürmek için yarı yapılandırılmış veri erişim API'lerini kullanabilirsiniz.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .load("<source-data-with-nested-json>") \
  .selectExpr(
    "*",
    "tags:page.name",    # extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"     # extracts {"tags":{"eventType":...}}
  )

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<source-data-with-nested-json>")
  .selectExpr(
    "*",
    "tags:page.name",     // extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int",  // extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"      // extracts {"tags":{"eventType":...}}
  )

İç içe JSON verilerini çıkarsama

İç içe veriniz olduğunda, verilerinizin cloudFiles.inferColumnTypes ve diğer sütun türlerinin iç içe yapısını çıkarsamak için seçeneğini kullanabilirsiniz.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .option("cloudFiles.inferColumnTypes", "true") \
  .load("<source-data-with-nested-json>")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .option("cloudFiles.inferColumnTypes", "true")
  .load("<source-data-with-nested-json>")

CSV dosyalarını üst bilgi olmadan yükleme

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Üst bilgi içeren CSV dosyalarında şemayı zorunlu kılma

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("header", "true") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("header", "true")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Görüntü veya ikili verileri ML için Delta Lake'e alma

Veriler Delta Lake'te depolandıktan sonra veriler üzerinde dağıtılmış çıkarım çalıştırabilirsiniz. Bkz. Pandas UDF kullanarak dağıtılmış çıkarım gerçekleştirme.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

DLT için Otomatik Yükleyici söz dizimi

Delta Live Tables, Otomatik Yükleyici için biraz değiştirilmiş Python söz dizimi sağlar ve Otomatik Yükleyici için SQL desteği ekler.

Aşağıdaki örneklerde CSV ve JSON dosyalarından veri kümeleri oluşturmak için Otomatik Yükleyici kullanılır:

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")

Otomatik Yükleyici ile desteklenen biçim seçeneklerini kullanabilirsiniz. map() işlevini kullanarak yöntemine cloud_files() seçenekleri geçirebilirsiniz. Seçenekler anahtar-değer çiftleridir ve burada anahtarlar ve değerler dizelerdir. Aşağıda, SQL'de Otomatik Yükleyici ile çalışmaya yönelik söz dizimi açıklanmaktadır:

CREATE OR REFRESH STREAMING TABLE <table-name>
AS SELECT *
  FROM cloud_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value",
      "<option-key>", "<option_value",
      ...
    )
  )

Aşağıdaki örnek, sekmeyle ayrılmış CSV dosyalarındaki verileri üst bilgiyle okur:

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", "\t", "header", "true"))

biçimini el ile belirtmek için kullanabilirsinizschema; şema çıkarımı desteklemeyen biçimler için belirtmelisinizschema:

Python

@dlt.table
def wiki_raw():
  return (
    spark.readStream.format("cloudFiles")
      .schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
      .option("cloudFiles.format", "parquet")
      .load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
  )

SQL

CREATE OR REFRESH STREAMING TABLE wiki_raw
AS SELECT *
  FROM cloud_files(
    "/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
    "parquet",
    map("schema", "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
  )

Dekont

Delta Live Tables, dosyaları okumak için Otomatik Yükleyici kullanılırken şemayı ve denetim noktası dizinlerini otomatik olarak yapılandırıp yönetir. Ancak, bu dizinlerden birini el ile yapılandırıyorsanız, tam yenileme gerçekleştirmek yapılandırılan dizinlerin içeriğini etkilemez. Databricks, işleme sırasında beklenmeyen yan etkileri önlemek için otomatik olarak yapılandırılmış dizinlerin kullanılmasını önerir.