Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
İşlem hatlarını kullanarak Azure Databricks üzerinde Apache Spark tarafından desteklenen herhangi bir veri kaynağından veri yükleyebilirsiniz. Lakeflow Spark Deklaratif Boru Hatları içinde, Spark DataFrame döndüren herhangi bir sorguya karşı veri kümeleri—tablolar ve görünümler—tanımlayabilirsiniz; bu sorgulara, akış DataFrame'leri ve Spark DataFrame'ler için Pandas dahildir. Databricks, veri alımı görevleri için çoğu kullanım örneğinde akış tablolarının kullanılmasını önerir. Akış tabloları, Auto Loader kullanarak bulut nesne depolama birimlerinden ya da Kafka gibi ileti veri yollarından veri almak için kullanışlıdır.
Tüm veri kaynaklarının aktarım için SQL desteği bulunmamaktadır. Ancak gerektiğinde Python kullanmak için SQL ve Python kaynaklarını aynı işlem hattında karıştırabilirsiniz. Varsayılan olarak Lakeflow Spark Bildirimli İşlem Hatlarında paketlenmemiş kitaplıklarla çalışma hakkında ayrıntılı bilgi için bkz. İşlem hatları için Python bağımlılıklarını yönetme. Azure Databricks'te veri girişi hakkında genel bilgi için daha fazla bilgi edinmek üzere Lakeflow Connect'te standart bağlayıcılar bölümüne bakın.
Aşağıdaki örneklerde bazı yaygın veri yükleme desenleri gösterilmektedir.
Var olan bir tablodan yükleme
Azure Databricks'teki mevcut tablolardan veri yükleme. Sorgu kullanarak verileri dönüştürebilir veya işlem hattınızda daha fazla işlem için tabloyu yükleyebilirsiniz.
Piton
@dp.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
)
SQL
CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
First_Name,
SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
Dosyaları bulut nesne depolama alanından yükleme
Databricks, bulut nesne depolamasından veya Unity Kataloğu birimindeki dosyalardan veri alımı görevlerinin çoğu için işlem hatlarında Otomatik Yükleyici'nin kullanılmasını önerir. Auto Loader ve işlem hatları, sürekli büyüyen verileri bulut depolama alanına ulaşırken, artımlı ve değişmez bir şekilde yüklemek üzere tasarlanmıştır. Bkz. Otomatik Yükleyici nedir? ve Nesne depolamadan veri yükleme.
Aşağıdaki örnek, Otomatik Yükleyici'yi kullanarak bulut depolamadan verileri okur.
Piton
@dp.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json")
)
SQL
CREATE OR REFRESH STREAMING TABLE sales
AS SELECT *
FROM STREAM read_files(
'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
format => "json"
);
Aşağıdaki örneklerde, Unity Kataloğu birimindeki CSV dosyalarından veri kümeleri oluşturmak için Otomatik Yükleyici kullanılır.
Piton
@dp.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/Volumes/my_catalog/retail_org/customers/")
)
SQL
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
"/Volumes/my_catalog/retail_org/customers/",
format => "csv"
)
Uyarı
- Dosya bildirimleriyle Otomatik Yükleyici'yi kullanır ve işlem hattınız veya akış tablonuz için tam yenileme çalıştırırsanız kaynaklarınızı el ile temizlemeniz gerekir. Temizleme gerçekleştirmek için not defterindeki CloudFilesResourceManager kullanabilirsiniz.
- Unity Kataloğu etkin bir işlem hattında Otomatik Yükleyici ile dosya yüklemek için dış konumları kullanmanız gerekir. Unity Kataloğu'nu işlem hatlarıyla kullanma hakkında daha fazla bilgi edinmek için bkz. Unity Kataloğu'nu işlem hatlarıyla kullanma.
Bulut depolamada kimlik doğrulaması
Otomatik Yükleyici, bulut depolamada kimlik doğrulaması yapmak için Unity Kataloğu dış konumlarını kullanır. Okumak istediğiniz depolama yolu için bir dış konum yapılandırmanız ve yürüten kullanıcıya ayrıcalığı tanımanız READ FILES gerekir.
Azure Data Lake Storage'den veri almak için, bir depolama kapsayıcısını referans alan bir depolama kimlik bilgisi tarafından yedeklenen bir harici konum yapılandırın. Daha fazla bilgi için bkz. Unity Kataloğu'nu kullanarak bulut nesne depolamasına bağlanma.
İleti veri yolu'ndan veri yükleme
İleti veri hatlarından veri almak için işlem hatlarını yapılandırabilirsiniz. Databricks, ileti veri kümelerinden düşük gecikme süreli yükleme için en verimli alımı sağlamak üzere sürekli yürütme ve gelişmiş otomatik ölçeklendirme ile akış tablolarının kullanılmasını önerir. Daha fazla bilgi için bkz. Otomatik Ölçeklendirme ile Lakeflow Spark Bildirimli İşlem Hatlarının küme kullanımını iyileştirme.
Örneğin aşağıdaki kod, read_kafka işlevini kullanarak Kafka'dan veri almak için bir akış tablosu yapılandırır.
Piton
from pyspark import pipelines as dp
@dp.table
def kafka_raw():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka_server:9092")
.option("subscribe", "topic1")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_raw AS
SELECT *
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'topic1'
);
Diğer ileti veri yolu kaynaklarından almak için aşağıdaki bilgilere bakınız:
- Kinesis: Read_Kinesis
- Pub/Sub konusu: read_pubsub
- Pulsar: read_pulsar
Azure Event Hubs den veri yükleme
Azure Event Hubs, Apache Kafka uyumlu bir arabirim sağlayan bir veri akışı hizmetidir. Azure Event Hubs'dan ileti yüklemek için Lakeflow Spark Bildirimli İşlem Hatları çalışma zamanına dahil edilen Yapılandırılmış Akış Kafka bağlayıcısını kullanabilirsiniz. Azure Event Hubs'dan iletileri yükleme ve işleme hakkında daha fazla bilgi edinmek için bkz. Azure Event Hubs'ı işlem hattı veri kaynağı olarak kullanma.
Dış sistemlerden veri yükleme
Lakeflow Spark Bildirimli İşlem Hatları, Azure Databricks tarafından desteklenen herhangi bir veri kaynağından veri yüklemeyi destekler. Bkz . Veri kaynaklarına ve dış hizmetlere bağlanma. Dış verileri, veri kaynaklarınıntarafından desteklendiği durumlarda Lakehouse Federasyonu kullanarak da yükleyebilirsiniz. Lakehouse Federation, Databricks Runtime 13.3 LTS veya üzerini gerektirdiğinden, Lakehouse Federation'ı kullanmak için işlem hattınızı önizleme kanalını kullanacak şekilde yapılandırın.
Bazı veri kaynaklarının eşdeğer SQL desteği yoktur. Lakehouse Federasyon'unu bu veri kaynaklarından biriyle kullanamıyorsanız, kaynaktan veri almak için Python'ı kullanabilirsiniz. Python ve SQL kaynak dosyalarını aynı işlem hattına ekleyebilirsiniz. Aşağıdaki örnek, uzak bir PostgreSQL tablosundaki verilerin mevcut durumuna erişmek için maddileştirilmiş bir görünüm bildirir.
import dp
@dp.table
def postgres_raw():
return (
spark.read
.format("postgresql")
.option("dbtable", table_name)
.option("host", database_host_url)
.option("port", 5432)
.option("database", database_name)
.option("user", username)
.option("password", password)
.load()
)
Bulut nesne depolama alanından küçük veya statik veri kümeleri yükleme
Apache Spark yük söz dizimini kullanarak küçük veya statik veri kümelerini yükleyebilirsiniz. Lakeflow Spark Bildirimli İşlem Hatları, Azure Databricks'te Apache Spark tarafından desteklenen tüm dosya biçimlerini destekler. Tam liste için bkz. Veri biçimi seçenekleri.
Aşağıdaki örneklerde, tablo oluşturmak için JSON yükleme gösterilmektedir.
Piton
@dp.table
def clickstream_raw():
return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))
SQL
CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM read_files(
"/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
)
Uyarı
read_files SQL işlevi, Azure Databricks'te tüm SQL ortamları için ortaktır. İşlem hatlarında SQL kullanarak doğrudan dosya erişimi için önerilen desendir. Daha fazla bilgi için bkz. Seçenekler.
Python özel veri kaynağından veri yükleme
Python özel veri kaynakları, verileri özel biçimlerde yüklemenize olanak sağlar. Belirli bir dış veri kaynağından okumak ve bu kaynağa yazmak için kod yazabilir veya kendi iç sistemlerinizdeki verileri okumak için mevcut Python kodunuzu kullanabilirsiniz. Python veri kaynakları geliştirme hakkında daha fazla ayrıntı için bkz. PySpark özel veri kaynakları.
Aşağıdaki örnek, özel bir veri kaynağını biçim adıyla my_custom_datasource kaydeder ve hem toplu işlem hem de akış modlarında bu kaynaktan okur.
from pyspark import pipelines as dp
# Assume `my_custom_datasource` is a custom Python custom data
# source that supports both batch and streaming reads, and has
# been registered using `spark.dataSource.register`.
# This creates a materialized view
@dp.table(name = "read_from_batch")
def read_from_batch():
return spark.read.format("my_custom_datasource").load()
# This creates a streaming table
@dp.table(name = "read_from_streaming")
def read_from_streaming():
return spark.readStream.format("my_custom_datasource").load()
Kaynak akış tablosundaki değişiklikleri yoksayacak şekilde bir akış tablosunu yapılandırma
Varsayılan olarak, akış tabloları yalnızca ekleme kaynakları gerektirir. Kaynak akış tablonuz güncelleştirme veya silme gerektiriyorsa—örneğin, GDPR "unutulma hakkı" işlemesi için—bu değişiklikleri yoksaymak için skipChangeCommits bayrağını kullanın. Bu bayrak yalnızca spark.readStream işlevini kullanarak option() çalışır ve kaynak akış tablosu bir create_auto_cdc_flow() işlevinin hedefi olduğunda kullanılamaz. Daha fazla bilgi için bkz. Kaynak Delta tablolarındaki değişiklikleri işleme.
@dp.table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("A")
İşlem hattında gizli bilgilerle depolama kimlik bilgilerine güvenli bir şekilde erişme
Erişim anahtarları veya parolalar gibi kimlik bilgilerini depolamak için Azure Databricks gizli dizileri kullanabilirsiniz. Gizliyi işlem hattınıza yapılandırmak için işlem hattı ayarları küme yapılandırmasında bir Spark özelliği kullanın. Boru hatları için klasik işlem yapılandırmasını yapılandırbkz.
Aşağıdaki örnek, Auto Loader kullanarak bir Azure Data Lake Storage depolama hesabından veri okumak için gereken erişim anahtarını depolamak amacıyla bir gizli anahtar kullanır. İşlem hattınızın gerektirdiği gizli dizileri (örneğin, S3'e erişmek için AWS anahtarları veya apache Hive meta veri deposu parolası) yapılandırmak için aynı yöntemi kullanabilirsiniz.
Azure Data Lake Storage ile çalışma hakkında daha fazla bilgi edinmek için bkz. Azure Data Lake Storage'a bağlanma ve Blob Depolama.
Uyarı
gizli dizi değerini ayarlayan spark.hadoop. yapılandırma anahtarına spark_conf ön ekini eklemeniz gerekir.
{
"id": "43246596-a63f-11ec-b909-0242ac120002",
"storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path>",
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
}
],
"development": true,
"continuous": false,
"libraries": [
{
"notebook": {
"path": "/Users/user@databricks.com/:re[LDP] Notebooks/:re[LDP] quickstart"
}
}
],
"name": ":re[LDP] quickstart using ADLS2"
}
Bu kod örneğinde aşağıdaki değerleri değiştirin.
| Yer tutucu | bununla değiştir |
|---|---|
<container-name> |
Azure depolama hesabı kapsayıcısının adı. |
<storage-account-name> |
ADLS depolama hesabı adı. |
<path> |
İşlem hattı çıktı verilerinin ve meta verilerinin yolu. |
<scope-name> |
Azure Databricks gizli kapsam adı. |
<secret-name> |
Azure depolama hesabı erişim anahtarını içeren anahtarın adı. |
from pyspark import pipelines as dp
json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dp.create_table(
comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(json_path)
)
Bu kod örneğinde aşağıdaki değerleri değiştirin.
| Yer tutucu | bununla değiştir |
|---|---|
<container-name> |
Giriş verilerini depolayan Azure depolama hesabı kapsayıcısının adı. |
<storage-account-name> |
ADLS depolama hesabı adı. |
<path-to-input-dataset> |
Giriş veri kümesinin yolu. |