Menyalin data dari MongoDB menggunakan Azure Data Factory atau Synapse Analytics
BERLAKU UNTUK: Azure Data Factory Azure Synapse Analytics
Tip
Cobalah Data Factory di Microsoft Fabric, solusi analitik all-in-one untuk perusahaan. Microsoft Fabric mencakup semuanya mulai dari pergerakan data hingga ilmu data, analitik real time, kecerdasan bisnis, dan pelaporan. Pelajari cara memulai uji coba baru secara gratis!
Artikel ini menguraikan cara menggunakan aktivitas Penyalinan di alur Azure Data Factory dan Azure Synapse Analytics untuk menyalin data dari dan ke database MongoDB. Artikel tersebut dibuat berdasarkan artikel gambaran umum aktivitas salin yang menyajikan gambaran umum aktivitas salin.
Penting
Konektor MongoDB baru memberikan dukungan MongoDB asli yang lebih baik. Jika Anda menggunakan konektor MongoDB sebelumnya dalam solusi Anda yang didukung apa adanya untuk kompatibilitas mundur, lihat artikel Konektor MongoDB (warisan).
Kemampuan yang didukung
Konektor MongoDB ini didukung untuk kemampuan berikut:
Kemampuan yang didukung | IR |
---|---|
Salin aktivitas (sumber/sink) | (1) (2) |
① Runtime integrasi Azure ② Runtime integrasi yang dihost sendiri
Untuk daftar penyimpanan data yang didukung sebagai sumber/sink, lihat tabel Penyimpanan data yang didukung.
Secara khusus, konektor MongoDB ini mendukung versi hingga 4.2. Jika pekerjaan Anda memerlukan versi yang lebih baru dari 4.2, pertimbangkan untuk menggunakan MongoDB Atlas dengan konektor MongoDB Atlas, yang menyediakan dukungan dan fitur yang lebih komprehensif.
Prasyarat
Jika penyimpanan data Anda terletak di dalam jaringan lokal, jaringan virtual Azure, atau Amazon Virtual Private Cloud, Anda harus mengonfigurasi runtime integrasi yang dihosting sendiri untuk menghubungkannya.
Jika penyimpanan data Anda adalah layanan data cloud terkelola, Anda dapat menggunakan Azure Integration Runtime. Jika akses dibatasi untuk IP yang disetujui dalam aturan firewall, Anda dapat menambahkan IP Azure Integration Runtime ke daftar izinkan.
Anda juga dapat menggunakan fitur runtime integrasi jaringan virtual terkelola di Azure Data Factory untuk mengakses jaringan lokal tanpa menginstal dan mengonfigurasi runtime integrasi yang dihosting sendiri.
Untuk informasi selengkapnya tentang mekanisme dan opsi keamanan jaringan yang didukung oleh Data Factory, lihat Strategi akses data.
Memulai
Untuk melakukan aktivitas Salin dengan alur, Anda dapat menggunakan salah satu alat atau SDK berikut:
- Alat Penyalinan Data
- Portal Microsoft Azure
- SDK .NET
- SDK Python
- Azure PowerShell
- REST API
- Templat Azure Resource Manager
Membuat layanan tertaut ke MongoDB menggunakan UI
Gunakan langkah-langkah berikut untuk membuat layanan tertaut ke MongoDB di UI portal Azure.
Telusuri ke tab Kelola di ruang kerja Azure Data Factory atau Synapse Anda dan pilih Layanan Tertaut, lalu klik Baru:
Cari MongoDB dan pilih konektor MongoDB.
Konfigurasikan detail layanan, uji koneksi, dan buat layanan tertaut baru.
Detail konfigurasi konektor
Bagian berikut memberikan detail tentang properti yang digunakan untuk menentukan entitas Data Factory khusus untuk konektor MongoDB.
Properti layanan tertaut
Properti berikut ini didukung untuk layanan tertaut MongoDB:
Properti | Deskripsi | Wajib |
---|---|---|
jenis | Jenis properti harus diatur ke: MongoDbV2 | Ya |
connectionString | Tentukan string koneksi MongoDB misalnya mongodb://[username:password@]host[:port][/[database][?options]] . Lihat Panduan MongoDB tentang string koneksi untuk detail selengkapnya. Anda juga dapat menempatkan string koneksi di Azure Key Vault. Lihat Menyimpan informasi masuk di Azure Key Vault dengan detail selengkapnya. |
Ya |
database | Nama database yang ingin Anda akses. | Ya |
connectVia | Runtime integrasi yang akan digunakan untuk menyambungkan ke penyimpanan data. Pelajari selengkapnya dari bagian Prasyarat. Jika tidak ditentukan, Azure Integration Runtime default akan digunakan. | No |
Contoh:
{
"name": "MongoDBLinkedService",
"properties": {
"type": "MongoDbV2",
"typeProperties": {
"connectionString": "mongodb://[username:password@]host[:port][/[database][?options]]",
"database": "myDatabase"
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
Properti himpunan data
Untuk daftar lengkap bagian dan properti yang tersedia untuk menentukan himpunan data, lihat Himpunan data dan layanan tertaut. Properti berikut ini didukung untuk himpunan data MongoDB:
Properti | Deskripsi | Wajib |
---|---|---|
jenis | Properti jenis himpunan data harus diatur ke: MongoDbV2Collection | Ya |
collectionName | Nama koleksi dalam database MongoDB. | Ya |
Contoh:
{
"name": "MongoDbDataset",
"properties": {
"type": "MongoDbV2Collection",
"typeProperties": {
"collectionName": "<Collection name>"
},
"schema": [],
"linkedServiceName": {
"referenceName": "<MongoDB linked service name>",
"type": "LinkedServiceReference"
}
}
}
Properti aktivitas salin
Untuk daftar lengkap bagian dan properti yang tersedia untuk menentukan aktivitas, lihat artikel Alur. Bagian ini menyediakan daftar properti yang didukung oleh sumber dan sink MongoDB.
MongoDB sebagai sumber
Berikut ini properti yang didukung di bagian sumber aktivitas salin:
Properti | Deskripsi | Wajib |
---|---|---|
jenis | Properti jenis sumber aktivitas penyalinan harus diatur ke: MongoDbV2Source | Ya |
filter | Menentukan filter pilihan menggunakan operator kueri. Untuk mengembalikan semua dokumen dalam koleksi, hilangkan parameter ini atau teruskan dokumen kosong ({}). | No |
cursorMethods.project | Menentukan bidang yang akan dikembalikan dalam dokumen untuk proyeksi. Untuk mengembalikan semua bidang dalam dokumen yang cocok, hilangkan parameter ini. | No |
cursorMethods.sort | Menentukan urutan kueri mengembalikan dokumen yang cocok. Lihat cursor.sort(). | No |
cursorMethods.limit | Menentukan jumlah maksimum dokumen yang dikembalikan server. Lihat cursor.limit(). | No |
cursorMethods.skip | Menentukan jumlah dokumen yang akan dilompati dan dari mana MongoDB mulai mengembalikan hasil. Lihat cursor.skip(). | No |
batchSize | Menentukan jumlah dokumen yang akan dikembalikan di setiap batch respons dari instans MongoDB. Dalam kebanyakan kasus, memodifikasi ukuran batch tidak akan mempengaruhi pengguna atau aplikasi. Azure Cosmos DB membatasi setiap batch tidak boleh melebihi ukuran 40 MB, yang merupakan jumlah ukuran dokumen batchSize, jadi kurangi nilai ini jika ukuran dokumen Anda besar. | No (defaultnya adalah 100) |
Tip
Layanan mendukung penggunaan dokumen BSON dalam mode Ketat. Pastikan kueri filter Anda dalam mode Ketat, bukan mode Shell. Deskripsi selengkapnya dapat ditemukan di Panduan MongoDB.
Contoh:
"activities":[
{
"name": "CopyFromMongoDB",
"type": "Copy",
"inputs": [
{
"referenceName": "<MongoDB input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "MongoDbV2Source",
"filter": "{datetimeData: {$gte: ISODate(\"2018-12-11T00:00:00.000Z\"),$lt: ISODate(\"2018-12-12T00:00:00.000Z\")}, _id: ObjectId(\"5acd7c3d0000000000000000\") }",
"cursorMethods": {
"project": "{ _id : 1, name : 1, age: 1, datetimeData: 1 }",
"sort": "{ age : 1 }",
"skip": 3,
"limit": 3
}
},
"sink": {
"type": "<sink type>"
}
}
}
]
MongoDB sebagai sink
Properti berikut ini didukung di Salin Aktivitas bagian sink:
Properti | Deskripsi | Wajib |
---|---|---|
jenis | Properti jenis sink Aktivitas Salin harus diatur ke MongoDbV2Sink. | Ya |
writeBehavior | Menjelaskan cara menulis data ke MongoDB. Nilai yang diperbolehkan sisipkan dan upsert. Perilaku upsert bertujuan mengganti dokumen jika dokumen dengan _id yang sama sudah ada; jika tidak, sisipkan dokumen.Catatan: Layanan secara otomatis membuat _id untuk dokumen jika _id tidak ditentukan baik dalam dokumen asli atau dengan pemetaan kolom. Artinya Anda harus memastikan bahwa, agar upsert berfungsi seperti yang diharapkan, dokumen Anda harus memiliki ID. |
No (defaultnya adalah sisipkan) |
writeBatchSize | Properti writeBatchSize mengontrol ukuran dokumen yang ditulis di setiap batch. Anda dapat mencoba meningkatkan nilai untuk writeBatchSize untuk meningkatkan performa dan mengurangi nilai jika ukuran dokumen Anda besar. | No (Nilai defaultnya adalah 10,000) |
writeBatchTimeout | Waktu tunggu untuk operasi insert batch selesai sebelum waktu habis. Nilai yang diperbolehkan adalah rentang waktu. | No (defaultnya adalah 00:30:00 - 30 menit) |
Tip
Untuk mengimpor dokumen JSON sebagaimana adanya, lihat bagian Mengimpor atau mengekspor dokumen JSON; untuk menyalin dari data berbentuk tabular, lihat pemetaan Skema.
Contoh
"activities":[
{
"name": "CopyToMongoDB",
"type": "Copy",
"inputs": [
{
"referenceName": "<input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<Document DB output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "<source type>"
},
"sink": {
"type": "MongoDbV2Sink",
"writeBehavior": "upsert"
}
}
}
]
Mengimpor dan mengekspor dokumen JSON
Anda dapat menggunakan konektor MongoDB ini dengan mudah:
- Menyalin dokumen antara dua koleksi MongoDB sebagaimana adanya.
- Mengimpor dokumen JSON dari berbagai sumber ke MongoDB, termasuk dari Azure Cosmos DB, penyimpanan Azure Blob, Azure Data Lake Store, dan penyimpanan berbasis file lainnya yang didukung Azure Data Factory.
- Mengekspor dokumen JSON dari koleksi MongoDB ke berbagai penyimpanan berbasis file.
Untuk mencapai salinan skema-agnostik seperti itu, lewati bagian "struktur" (juga disebut skema)dalam himpunan data dan pemetaan skema dalam aktivitas salin.
Pemetaan skema
Untuk menyalin data dari MongoDB ke sink tabular atau sebaliknya, lihat pemetaan skema.
Meningkatkan layanan tertaut MongoDB
Berikut adalah langkah-langkah yang membantu Anda meningkatkan layanan tertaut dan kueri terkait:
Buat layanan tertaut MongoDB baru dan konfigurasikan dengan merujuk ke Properti layanan tertaut.
Jika Anda menggunakan kueri SQL di alur Anda yang merujuk ke layanan tertaut MongoDB lama, ganti dengan kueri MongoDB yang setara. Lihat tabel berikut untuk contoh penggantian:
Kueri SQL Kueri MongoDB yang setara SELECT * FROM users
db.users.find({})
SELECT username, age FROM users
db.users.find({}, {username: 1, age: 1})
SELECT username AS User, age AS Age, statusNumber AS Status, CASE WHEN Status = 0 THEN "Pending" CASE WHEN Status = 1 THEN "Finished" ELSE "Unknown" END AS statusEnum LastUpdatedTime + interval '2' hour AS NewLastUpdatedTime FROM users
db.users.aggregate([{ $project: { _id: 0, User: "$username", Age: "$age", Status: "$statusNumber", statusEnum: { $switch: { branches: [ { case: { $eq: ["$Status", 0] }, then: "Pending" }, { case: { $eq: ["$Status", 1] }, then: "Finished" } ], default: "Unknown" } }, NewLastUpdatedTime: { $add: ["$LastUpdatedTime", 2 * 60 * 60 * 1000] } } }])
SELECT employees.name, departments.name AS department_name FROM employees LEFT JOIN departments ON employees.department_id = departments.id;
db.employees.aggregate([ { $lookup: { from: "departments", localField: "department_id", foreignField: "_id", as: "department" } }, { $unwind: "$department" }, { $project: { _id: 0, name: 1, department_name: "$department.name" } } ])
Konten terkait
Untuk daftar penyimpanan data yang didukung sebagai sumber dan sink oleh aktivitas salin, lihat penyimpanan data yang didukung.