Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Yapılandırılmış Akış kullanarak akış veri kaynaklarını sorgulamak için Azure Databricks'i kullanabilirsiniz. Azure Databricks, Python ve Scala'da akış iş yükleri için kapsamlı destek sağlar ve SQL ile Çoğu Yapılandırılmış Akış işlevini destekler.
Aşağıdaki örneklerde, not defterlerinde etkileşimli geliştirme sırasında akış verilerinin el ile incelenmesi için bellek havuzu kullanımı gösterilmektedir. Not defteri kullanıcı arabirimindeki satır çıktı sınırları nedeniyle, akış sorguları tarafından okunan tüm verileri gözlemlemeyebilirsiniz. Üretim iş yüklerinde, akış sorgularını yalnızca bir hedef tabloya veya dış sisteme yazarak tetiklemeniz gerekir.
Dikkat
Akış verileriyle ilgili etkileşimli sorgular için SQL desteği, tüm amaçlı işlemlerde çalışan not defterleriyle sınırlıdır. Databricks SQL veya Lakeflow Spark Bildirimli İşlem Hatlarında akış tabloları bildirirken SQL de kullanabilirsiniz. Bkz Akış tabloları ve Lakeflow Spark Bildirimli İşlem Hatları.
Akış sistemlerinden verileri sorgulama
Azure Databricks, aşağıdaki akış sistemleri için akış veri okuyucuları sağlar:
- Kafka
- Kinezi
- PubSub
- Pulsar
Bu sistemlere yönelik sorguları başlatırken, seçtiğiniz ortama ve okumak istediğiniz sisteme bağlı olarak değişen yapılandırma ayrıntılarını sağlamanız gerekir. Bkz. Lakeflow Connect'te standart bağlayıcılar.
Akış sistemlerini içeren yaygın iş yükleri, lakehouse'a veri alma ve verileri dış sistemlere aktarmak için akış işlemesini içerir. Akış iş yükleri hakkında daha fazla bilgi için bkz. Yapılandırılmış Akış kavramları.
Aşağıdaki örneklerde Kafka'dan okunan etkileşimli bir akış gösterilmektedir:
Piton
display(spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)
SQL
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:ip>',
subscribe => '<topic>',
startingOffsets => 'latest'
);
Tabloyu akış okuma yöntemiyle sorgulama
Azure Databricks varsayılan olarak Delta Lake kullanarak tüm tabloları oluşturur. Delta tablosunda akış sorgusu gerçekleştirdiğinizde, tablonun bir sürümü işlendiğinde sorgu yeni kayıtları otomatik olarak alır. Varsayılan olarak, akış sorguları kaynak tabloların yalnızca eklenen kayıtları içermesini bekler. Güncelleştirmeleri ve silmeleri içeren akış verileriyle çalışmanız gerekiyorsa Databricks, Lakeflow Spark Bildirimli İşlem Hatlarının ve AUTO CDC ... INTOkullanılmasını önerir. Bkz AUTO CDC API'leri: İşlem hatlarıyla değişiklik verilerini yakalamayı basitleştirin.
Aşağıdaki örnekler, bir tablodan okunan etkileşimli bir akış gerçekleştirmeyi gösterir:
Piton
display(spark.readStream.table("table_name"))
SQL
SELECT * FROM STREAM table_name
Otomatik Yükleyici ile bulut nesne depolamasında verileri sorgulama
Azure Databricks bulut veri bağlayıcısı olan Otomatik Yükleyici'yi kullanarak bulut nesne depolama alanından veri akışı yapabilirsiniz. Bağlayıcıyı Unity Kataloğu birimlerinde veya diğer bulut nesne depolama konumlarında depolanan dosyalarla kullanabilirsiniz. Databricks, bulut nesne depolamadaki verilere erişimi yönetmek için birimlerin kullanılmasını önerir. Bkz . Veri kaynaklarına ve dış hizmetlere bağlanma.
Azure Databricks, popüler biçimlendirilmiş, yarı biçimlendirilmiş ve biçimlendirilmemiş biçimlerde depolanan bulut nesne depolama verilerinin sürekli olarak alımı için bu bağlayıcıyı optimize eder. Databricks, bozuk kayıtlar veya şema değişiklikleri nedeniyle aktarım hızını en üst düzeye çıkarmak ve olası veri kaybını en aza indirmek için alınan verilerin neredeyse ham biçimde depolanmasını önerir.
Bulut nesne depolama alanından veri alma hakkında daha fazla öneri için bkz. Lakeflow Connect'te standart bağlayıcılar.
Aşağıdaki örneklerde bir birimdeki JSON dosyalarının dizininden okunan etkileşimli bir akış gösterilmektedir:
Piton
display(spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").load("/Volumes/catalog/schema/volumes/path/to/files"))
SQL
SELECT * FROM STREAM read_files('/Volumes/catalog/schema/volumes/path/to/files', format => 'json')