Aracılığıyla paylaş


Delta Live Tabloları ile veri yükleme

Delta Live Tablolarını kullanarak Azure Databricks'te Apache Spark tarafından desteklenen herhangi bir veri kaynağından veri yükleyebilirsiniz. Akış DataFrame'leri ve Spark DataFrame'ler için Pandas dahil olmak üzere Spark DataFrame döndüren tüm sorgular için Delta Live Tablolarında veri kümeleri (tablolar ve görünümler) tanımlayabilirsiniz. Veri alımı görevleri için Databricks çoğu kullanım örneğinde akış tablolarının kullanılmasını önerir. Akış tabloları, Otomatik Yükleyici kullanarak bulut nesne depolama alanından veya Kafka gibi ileti veri yollarından veri almak için iyidir. Aşağıdaki örneklerde bazı yaygın desenler gösterilmiştir.

Önemli

Tüm veri kaynaklarının SQL desteği yoktur. Sql ve Python not defterlerini bir Delta Live Tabloları işlem hattında karıştırarak sql'i alımdan sonraki tüm işlemler için kullanabilirsiniz.

Varsayılan olarak Delta Live Tablolarında paketlenmemiş kitaplıklarla çalışma hakkında ayrıntılı bilgi için bkz . Delta Live Tables işlem hatları için Python bağımlılıklarını yönetme.

Bulut nesne depolama alanından dosya yükleme

Databricks, bulut nesne depolama alanından veri alımı görevlerinin çoğu için Delta Live Tablolarla Otomatik Yükleyici'nin kullanılmasını önerir. Otomatik Yükleyici ve Delta Live Tabloları, sürekli büyüyen verileri bulut depolama alanına ulaştıktan sonra artımlı ve sürekli yüklenecek şekilde tasarlanmıştır. Aşağıdaki örneklerde CSV ve JSON dosyalarından veri kümeleri oluşturmak için Otomatik Yükleyici kullanılır:

Not

Unity Kataloğu etkin bir işlem hattında Otomatik Yükleyici ile dosya yüklemek için dış konumları kullanmanız gerekir. Unity Kataloğu'nu Delta Live Tabloları ile kullanma hakkında daha fazla bilgi edinmek için bkz. Delta Live Tabloları işlem hatlarınızla Unity Kataloğu'nu kullanma.

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders/", "json")

Bkz . Otomatik Yükleyici nedir? ve Otomatik Yükleyici SQL söz dizimi.

Uyarı

Dosya bildirimleriyle Otomatik Yükleyici'yi kullanır ve işlem hattınız veya akış tablonuz için tam yenileme çalıştırırsanız kaynaklarınızı el ile temizlemeniz gerekir. Temizleme gerçekleştirmek için bir not defterinde CloudFilesResourceManager'ı kullanabilirsiniz.

İleti veri yolundan veri yükleme

Delta Live Tables işlem hatlarını, akış tablolarıyla ileti veri kümelerinden veri almak için yapılandırabilirsiniz. Databricks, ileti veri kümelerinden düşük gecikme süreli yükleme için en verimli alımı sağlamak üzere akış tablolarını sürekli yürütme ve gelişmiş otomatik ölçeklendirme ile birleştirmenizi önerir. Bkz . Gelişmiş otomatik ölçeklendirme ile Delta Live Tables işlem hatlarının küme kullanımını iyileştirme.

Örneğin, aşağıdaki kod Kafka'dan veri almak için bir akış tablosu yapılandırır:

import dlt

@dlt.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "<server:ip>")
      .option("subscribe", "topic1")
      .option("startingOffsets", "latest")
      .load()
  )

Aşağıdaki örnekte olduğu gibi, bu veriler üzerinde akış dönüştürmeleri gerçekleştirmek için saf SQL'de aşağı akış işlemleri yazabilirsiniz:

CREATE OR REFRESH STREAMING TABLE streaming_silver_table
AS SELECT
  *
FROM
  STREAM(LIVE.kafka_raw)
WHERE ...

Event Hubs ile çalışma örneği için bkz . Azure Event Hubs'ı Delta Live Tables veri kaynağı olarak kullanma.

Bkz . Akış veri kaynaklarını yapılandırma.

Dış sistemlerden veri yükleme

Delta Live Tabloları, Azure Databricks tarafından desteklenen herhangi bir veri kaynağından veri yüklemeyi destekler. Bkz . Veri kaynaklarına bağlanma. Ayrıca, desteklenen veri kaynakları için Lakehouse Federation kullanarak dış verileri de yükleyebilirsiniz. Lakehouse Federation, Databricks Runtime 13.3 LTS veya üzerini gerektirdiğinden, Lakehouse Federasyonu'nı kullanmak için işlem hattınızın önizleme kanalını kullanacak şekilde yapılandırılması gerekir.

Bazı veri kaynaklarının SQL'de eşdeğer desteği yoktur. Lakehouse Federation'u bu veri kaynaklarından biriyle kullanamıyorsanız, kaynaktan veri almak için Python not defteri kullanabilirsiniz. Aynı Delta Live Tables işlem hattına Python ve SQL kaynak kodu ekleyebilirsiniz. Aşağıdaki örnek, uzak bir PostgreSQL tablosundaki verilerin geçerli durumuna erişmek için gerçekleştirilmiş bir görünüm bildirir:

import dlt

@dlt.table
def postgres_raw():
  return (
    spark.read
      .format("postgresql")
      .option("dbtable", table_name)
      .option("host", database_host_url)
      .option("port", 5432)
      .option("database", database_name)
      .option("user", username)
      .option("password", password)
      .load()
  )

Bulut nesne depolama alanından küçük veya statik veri kümeleri yükleme

Apache Spark yük söz dizimini kullanarak küçük veya statik veri kümelerini yükleyebilirsiniz. Delta Live Tables, Azure Databricks üzerinde Apache Spark tarafından desteklenen tüm dosya biçimlerini destekler. Tam liste için bkz . Veri biçimi seçenekleri.

Aşağıdaki örneklerde, Delta Live Tables tabloları oluşturmak için JSON yükleme gösterilmektedir:

Python

@dlt.table
def clickstream_raw():
  return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))

SQL

CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`;

Not

SELECT * FROM format.`path`; SQL yapısı, Azure Databricks'te tüm SQL ortamları için ortaktır. Delta Live Tables ile SQL kullanarak doğrudan dosya erişimi için önerilen desendir.

İşlem hattındaki gizli dizilerle depolama kimlik bilgilerine güvenli bir şekilde erişme

Erişim anahtarları veya parolalar gibi kimlik bilgilerini depolamak için Azure Databricks gizli dizilerini kullanabilirsiniz. İşlem hattınızdaki gizli diziyi yapılandırmak için işlem hattı ayarları küme yapılandırmasında bir Spark özelliği kullanın. Bkz . Delta Live Tables işlem hattı için işlem yapılandırma.

Aşağıdaki örnek, Otomatik Yükleyici kullanarak bir Azure Data Lake Storage 2. Nesil (ADLS 2. Nesil) depolama hesabından giriş verilerini okumak için gereken erişim anahtarını depolamak için bir gizli dizi kullanır. İşlem hattınızın gerektirdiği gizli dizileri (örneğin, S3'e erişmek için AWS anahtarları veya apache Hive meta veri deposu parolası) yapılandırmak için aynı yöntemi kullanabilirsiniz.

Azure Data Lake Storage 2. Nesil ile çalışma hakkında daha fazla bilgi edinmek için bkz. Azure Data Lake Storage 2. Nesil ve Blob Depolama'ya bağlanma.

Not

Gizli dizi değerini ayarlayan yapılandırma anahtarına spark_conf ön eki eklemeniz spark.hadoop. gerekir.

{
    "id": "43246596-a63f-11ec-b909-0242ac120002",
    "clusters": [
      {
        "spark_conf": {
          "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
        },
        "autoscale": {
          "min_workers": 1,
          "max_workers": 5,
          "mode": "ENHANCED"
        }
      }
    ],
    "development": true,
    "continuous": false,
    "libraries": [
      {
        "notebook": {
          "path": "/Users/user@databricks.com/DLT Notebooks/Delta Live Tables quickstart"
        }
      }
    ],
    "name": "DLT quickstart using ADLS2"
}

Replace

  • <storage-account-name> adls 2. Nesil depolama hesabı adıyla.
  • <scope-name> azure databricks gizli kapsam adıyla.
  • <secret-name> azure depolama hesabı erişim anahtarını içeren anahtarın adıyla.
import dlt

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

Replace

  • <container-name> giriş verilerini depolayan Azure depolama hesabı kapsayıcısının adıyla.
  • <storage-account-name> adls 2. Nesil depolama hesabı adıyla.
  • <path-to-input-dataset> giriş veri kümesinin yolunu içerir.

Azure Event Hubs'dan veri yükleme

Azure Event Hubs, Apache Kafka uyumlu bir arabirim sağlayan bir veri akışı hizmetidir. Azure Event Hubs'dan ileti yüklemek için Delta Live Tables çalışma zamanına dahil edilen Yapılandırılmış Akış Kafka bağlayıcısını kullanabilirsiniz. Azure Event Hubs'dan iletileri yükleme ve işleme hakkında daha fazla bilgi edinmek için bkz . Azure Event Hubs'ı Delta Live Tables veri kaynağı olarak kullanma.