Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
İşlem hatlarını kullanarak Azure Databricks üzerinde Apache Spark tarafından desteklenen herhangi bir veri kaynağından veri yükleyebilirsiniz. Akış DataFrame'leri ve Spark DataFrame'ler için Pandas dahil olmak üzere Spark DataFrame döndüren tüm sorgular için Lakeflow Spark Bildirimli İşlem Hatlarında veri kümeleri (tablolar ve görünümler) tanımlayabilirsiniz. Databricks, veri alımı görevleri için çoğu kullanım örneğinde akış tablolarının kullanılmasını önerir. Akış tabloları, Auto Loader kullanarak bulut nesne depolama alanından veya Kafka gibi mesaj ağlarından veri almak için idealdir.
Uyarı
- Tüm veri kaynaklarının aktarım için SQL desteği bulunmamaktadır. Sql ve Python kaynaklarını işlem hatlarında birleştirerek Python'ı gerektiği yerde, SQL'i de aynı işlem hattındaki diğer işlemler için kullanabilirsiniz.
- Varsayılan olarak Lakeflow Spark Bildirimli İşlem Hatlarında paketlenmemiş kitaplıklarla çalışma hakkında ayrıntılı bilgi için bkz. İşlem hatları için Python bağımlılıklarını yönetme.
- Azure Databricks'te veri girişi hakkında genel bilgi için daha fazla bilgi edinmek üzere Lakeflow Connect'te standart bağlayıcılar bölümüne bakın.
Aşağıdaki örneklerde bazı yaygın desenler gösterilmiştir.
Var olan bir tablodan yükleme
Azure Databricks'teki mevcut tablolardan veri yükleme. Sorgu kullanarak verileri dönüştürebilir veya işlem hattınızda daha fazla işlem için tabloyu yükleyebilirsiniz.
Aşağıdaki örnek, mevcut bir tablodaki verileri okur:
Piton
@dp.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
)
SQL
CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
First_Name,
SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
Dosyaları bulut nesne depolama alanından yükleme
Databricks, bulut nesne depolamasından veya Unity Kataloğu birimindeki dosyalardan veri alımı görevlerinin çoğu için işlem hatlarında Otomatik Yükleyici'nin kullanılmasını önerir. Auto Loader ve işlem hatları, sürekli büyüyen verileri bulut depolama alanına ulaşırken, artımlı ve değişmez bir şekilde yüklemek üzere tasarlanmıştır.
Bkz. Otomatik Yükleyici nedir? ve Nesne depolamadan veri yükleme.
Aşağıdaki örnek, Otomatik Yükleyici'yi kullanarak bulut depolamadan verileri okur:
Piton
@dp.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json")
)
SQL
CREATE OR REFRESH STREAMING TABLE sales
AS SELECT *
FROM STREAM read_files(
'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
format => "json"
);
Aşağıdaki örneklerde, Unity Kataloğu birimindeki CSV dosyalarından veri kümeleri oluşturmak için Otomatik Yükleyici kullanılır:
Piton
@dp.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/Volumes/my_catalog/retail_org/customers/")
)
SQL
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
"/Volumes/my_catalog/retail_org/customers/",
format => "csv"
)
Uyarı
- Dosya bildirimleriyle Otomatik Yükleyici'yi kullanır ve işlem hattınız veya akış tablonuz için tam yenileme çalıştırırsanız kaynaklarınızı el ile temizlemeniz gerekir. Temizleme gerçekleştirmek için not defterindeki CloudFilesResourceManager kullanabilirsiniz.
- Unity Kataloğu etkin bir işlem hattında Otomatik Yükleyici ile dosya yüklemek için dış konumları kullanmanız gerekir. Unity Kataloğu'nu işlem hatlarıyla kullanma hakkında daha fazla bilgi edinmek için bkz. Unity Kataloğu'nu işlem hatlarıyla kullanma.
İleti veri yolu'ndan veri yükleme
İleti veri hatlarından veri almak için işlem hatlarını yapılandırabilirsiniz. Databricks, ileti veri kümelerinden düşük gecikme süreli yükleme için en verimli alımı sağlamak üzere sürekli yürütme ve gelişmiş otomatik ölçeklendirme ile akış tablolarının kullanılmasını önerir. Bkz. Otomatik Ölçeklendirme ile Lakeflow Spark Bildirimli İşlem Hatlarının küme kullanımını iyileştirme.
Örneğin aşağıdaki kod, read_kafka işlevini kullanarak Kafka'dan veri almak için bir akış tablosu yapılandırır:
Piton
from pyspark import pipelines as dp
@dp.table
def kafka_raw():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka_server:9092")
.option("subscribe", "topic1")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_raw AS
SELECT *
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'topic1'
);
Diğer ileti veri yolu kaynaklarından almak için aşağıdaki bilgilere bakınız:
- Kinesis: Read_Kinesis
- Pub/Sub konusu: read_pubsub
- Pulsar: read_pulsar
Azure Event Hubs'dan veri yükleme
Azure Event Hubs, Apache Kafka uyumlu bir arabirim sağlayan bir veri akışı hizmetidir. Azure Event Hubs'dan ileti yüklemek için Lakeflow Spark Bildirimli İşlem Hatları çalışma zamanına dahil edilen Yapılandırılmış Akış Kafka bağlayıcısını kullanabilirsiniz. Azure Event Hubs'dan iletileri yükleme ve işleme hakkında daha fazla bilgi edinmek için bkz. Azure Event Hubs'ı işlem hattı veri kaynağı olarak kullanma.
Dış sistemlerden veri yükleme
Lakeflow Spark Bildirimli İşlem Hatları, Azure Databricks tarafından desteklenen herhangi bir veri kaynağından veri yüklemeyi destekler. Bkz . Veri kaynaklarına ve dış hizmetlere bağlanma. Dış verileri, veri kaynaklarınıntarafından desteklendiği durumlarda Lakehouse Federasyonu kullanarak da yükleyebilirsiniz. Lakehouse Federation, Databricks Runtime 13.3 LTS veya üzerini gerektirdiğinden, Lakehouse Federation'ı kullanmak için işlem hattınızın önizleme kanalını kullanacak şekilde yapılandırılması gerekir.
Bazı veri kaynaklarının SQL'de eşdeğer desteği yoktur. Lakehouse Federasyon'unu bu veri kaynaklarından biriyle kullanamıyorsanız, kaynaktan veri almak için Python'ı kullanabilirsiniz. Python ve SQL kaynak dosyalarını aynı işlem hattına ekleyebilirsiniz. Aşağıdaki örnek, uzak bir PostgreSQL tablosundaki verilerin geçerli durumuna erişmek için gerçekleştirilmiş bir görünüm bildirir:
import dp
@dp.table
def postgres_raw():
return (
spark.read
.format("postgresql")
.option("dbtable", table_name)
.option("host", database_host_url)
.option("port", 5432)
.option("database", database_name)
.option("user", username)
.option("password", password)
.load()
)
Bulut nesne depolama alanından küçük veya statik veri kümeleri yükleme
Apache Spark yük söz dizimini kullanarak küçük veya statik veri kümelerini yükleyebilirsiniz. Lakeflow Spark Bildirimli İşlem Hatları, Azure Databricks'te Apache Spark tarafından desteklenen tüm dosya biçimlerini destekler. Tam liste için bkz. Veri biçimi seçenekleri.
Aşağıdaki örneklerde, tablo oluşturmak için JSON yükleme gösterilmektedir:
Piton
@dp.table
def clickstream_raw():
return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))
SQL
CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM read_files(
"/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
)
Uyarı
read_files SQL işlevi, Azure Databricks'te tüm SQL ortamları için ortaktır. İşlem hatlarında SQL kullanarak doğrudan dosya erişimi için önerilen desendir. Daha fazla bilgi için bkz. Seçenekler.
Python özel veri kaynağından veri yükleme
Python özel veri kaynakları, verileri özel biçimlerde yüklemenize olanak sağlar. Belirli bir dış veri kaynağından okumak ve bu kaynağa yazmak için kod yazabilir veya kendi iç sistemlerinizdeki verileri okumak için mevcut sistemlerinizdeki mevcut Python kodundan yararlanabilirsiniz. Python veri kaynakları geliştirme hakkında daha fazla ayrıntı için bkz. PySpark özel veri kaynakları.
İşlem hattına veri yüklemek için özel bir Python veri kaynağı kullanmak amacıyla, onu my_custom_datasource gibi bir biçim adıyla kaydedin ve ardından bu kaynaktan okuyun.
from pyspark import pipelines as dp
# Assume `my_custom_datasource` is a custom Python custom data
# source that supports both batch and streaming reads, and has
# been registered using `spark.dataSource.register`.
# This creates a materialized view
@dp.table(name = "read_from_batch")
def read_from_batch():
return spark.read.format("my_custom_datasource").load()
# This creates a streaming table
@dp.table(name = "read_from_streaming")
def read_from_streaming():
return spark.readStream.format("my_custom_datasource").load()
Kaynak akış tablosundaki değişiklikleri yoksayacak şekilde bir akış tablosunu yapılandırma
Uyarı
Varsayılan olarak, akış tabloları yalnızca ekleme kaynakları gerektirir. Akış tablosu, kaynak olarak başka bir akış tablosu kullandığında ve kaynak akış tablosunun güncellenmesi veya silinmesi gerektiğinde (örneğin, GDPR "unutulma hakkı" işlemesi), bu değişiklikleri göz ardı edebilmek için kaynak akış tablosu okunduğunda skipChangeCommits bayrağı ayarlanabilir. Bu seçenek hakkında daha fazla bilgi için Güncelleştirmeleri ve silmeleri yoksayma bölümüne bakın.
@dp.table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("A")
İşlem hattında gizli bilgilerle depolama kimlik bilgilerine güvenli bir şekilde erişme
Erişim anahtarları veya parolalar gibi kimlik bilgilerini depolamak için Azure Databricks gizli dizileri kullanabilirsiniz. Gizliyi işlem hattınıza yapılandırmak için işlem hattı ayarları küme yapılandırmasında bir Spark özelliği kullanın. Boru hatları için klasik işlem yapılandırmasını yapılandırbkz.
Aşağıdaki örnek, Otomatik Yükleyicikullanarak Azure Data Lake Storage (ADLS) depolama hesabından giriş verilerini okumak için gereken erişim anahtarını depolamak amacıyla bir gizli bilgi kullanır. İşlem hattınızın gerektirdiği gizli dizileri (örneğin, S3'e erişmek için AWS anahtarları veya apache Hive meta veri deposu parolası) yapılandırmak için aynı yöntemi kullanabilirsiniz.
Azure Data Lake Storage ile çalışma hakkında daha fazla bilgi edinmek için bkz. Azure Data Lake Storage'a bağlanma ve Blob Depolama.
Uyarı
gizli dizi değerini ayarlayan spark.hadoop. yapılandırma anahtarına spark_conf ön ekini eklemeniz gerekir.
{
"id": "43246596-a63f-11ec-b909-0242ac120002",
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
}
],
"development": true,
"continuous": false,
"libraries": [
{
"notebook": {
"path": "/Users/user@databricks.com/:re[LDP] Notebooks/:re[LDP] quickstart"
}
}
],
"name": ":re[LDP] quickstart using ADLS2"
}
Değiştir
- ADLS depolama hesabı adı ile
<storage-account-name>kullanın. -
<scope-name>Azure Databricks gizli kapsam adıyla. -
<secret-name>Azure depolama hesabı erişim anahtarını içeren anahtarın ismiyle.
from pyspark import pipelines as dp
json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dp.create_table(
comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(json_path)
)
Değiştir
-
<container-name>giriş verilerini depolayan Azure depolama hesabı kapsayıcısının adıyla. - ADLS depolama hesabı adı ile
<storage-account-name>kullanın. - Giriş veri kümesine giden yol
<path-to-input-dataset>ile.