Gegevens kopiëren van of naar MongoDB met behulp van Azure Data Factory of Synapse Analytics
VAN TOEPASSING OP: Azure Data Factory Azure Synapse Analytics
Tip
Probeer Data Factory uit in Microsoft Fabric, een alles-in-één analyseoplossing voor ondernemingen. Microsoft Fabric omvat alles, van gegevensverplaatsing tot gegevenswetenschap, realtime analyses, business intelligence en rapportage. Meer informatie over het gratis starten van een nieuwe proefversie .
In dit artikel wordt beschreven hoe u de kopieeractiviteit in Azure Data Factory Synapse Analytics-pijplijnen gebruikt om gegevens van en naar een MongoDB-database te kopiëren. Het is gebaseerd op het artikel over het overzicht van kopieeractiviteiten met een algemeen overzicht van de kopieeractiviteit.
Belangrijk
De nieuwe MongoDB-connector biedt verbeterde systeemeigen MongoDB-ondersteuning. Als u de verouderde MongoDB-connector in uw oplossing gebruikt, die alleen wordt ondersteund voor compatibiliteit met eerdere versies, raadpleegt u het artikel mongoDB-connector (verouderd ).
Ondersteunde mogelijkheden
Deze MongoDB-connector wordt ondersteund voor de volgende mogelijkheden:
Ondersteunde mogelijkheden | IR |
---|---|
Copy-activiteit (bron/sink) | (1) (2) |
(1) Azure Integration Runtime (2) Zelf-hostende Integration Runtime
Zie de tabel Ondersteunde gegevensarchieven voor een lijst met gegevensarchieven die worden ondersteund als bronnen/sinks.
Deze MongoDB-connector ondersteunt met name versies tot 4.2. Als voor uw werk nieuwere versies dan 4.2 zijn vereist, kunt u MongoDB Atlas gebruiken met de MongoDB Atlas-connector, die uitgebreidere ondersteuning en functies biedt.
Vereisten
Als uw gegevensarchief zich in een on-premises netwerk, een virtueel Azure-netwerk of een virtuele particuliere cloud van Amazon bevindt, moet u een zelf-hostende Integration Runtime configureren om er verbinding mee te maken.
Als uw gegevensarchief een beheerde cloudgegevensservice is, kunt u De Azure Integration Runtime gebruiken. Als de toegang is beperkt tot IP-adressen die zijn goedgekeurd in de firewallregels, kunt u IP-adressen van Azure Integration Runtime toevoegen aan de acceptatielijst.
U kunt ook de beheerde functie voor integratieruntime voor virtuele netwerken in Azure Data Factory gebruiken om toegang te krijgen tot het on-premises netwerk zonder een zelf-hostende Integration Runtime te installeren en te configureren.
Zie Strategieën voor gegevenstoegang voor meer informatie over de netwerkbeveiligingsmechanismen en -opties die door Data Factory worden ondersteund.
Aan de slag
Als u de kopieeractiviteit wilt uitvoeren met een pijplijn, kunt u een van de volgende hulpprogramma's of SDK's gebruiken:
- Het hulpprogramma voor het kopiëren van gegevens
- Azure Portal
- De .NET-SDK
- De Python-SDK
- Azure PowerShell
- De REST API
- Een Azure Resource Manager-sjabloon
Een gekoppelde service maken voor MongoDB met behulp van de gebruikersinterface
Gebruik de volgende stappen om een gekoppelde service te maken voor MongoDB in de gebruikersinterface van Azure Portal.
Blader naar het tabblad Beheren in uw Azure Data Factory- of Synapse-werkruimte en selecteer Gekoppelde services en klik vervolgens op Nieuw:
Zoek naar MongoDB en selecteer de MongoDB-connector.
Configureer de servicedetails, test de verbinding en maak de nieuwe gekoppelde service.
Configuratiedetails van connector
De volgende secties bevatten details over eigenschappen die worden gebruikt om Data Factory-entiteiten te definiëren die specifiek zijn voor MongoDB-connector.
Eigenschappen van gekoppelde service
De volgende eigenschappen worden ondersteund voor de gekoppelde MongoDB-service:
Eigenschappen | Beschrijving | Vereist |
---|---|---|
type | De typeeigenschap moet worden ingesteld op: MongoDbV2 | Ja |
connectionString | Geef de MongoDB-verbindingsreeks bijvoorbeeld mongodb://[username:password@]host[:port][/[database][?options]] op. Raadpleeg de MongoDB-handleiding voor verbindingsreeks voor meer informatie. U kunt ook een verbindingsreeks in Azure Key Vault plaatsen. Raadpleeg De referenties van de Store in Azure Key Vault met meer informatie. |
Ja |
database | De naam van de database waartoe u toegang wilt hebben. | Ja |
connectVia | De Integration Runtime die moet worden gebruikt om verbinding te maken met het gegevensarchief. Meer informatie vindt u in de sectie Vereisten . Als dit niet is opgegeven, wordt de standaard Azure Integration Runtime gebruikt. | Nee |
Voorbeeld:
{
"name": "MongoDBLinkedService",
"properties": {
"type": "MongoDbV2",
"typeProperties": {
"connectionString": "mongodb://[username:password@]host[:port][/[database][?options]]",
"database": "myDatabase"
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
Eigenschappen van gegevensset
Zie Gegevenssets en gekoppelde services voor een volledige lijst met secties en eigenschappen die beschikbaar zijn voor het definiëren van gegevenssets. De volgende eigenschappen worden ondersteund voor mongoDB-gegevensset:
Eigenschappen | Beschrijving | Vereist |
---|---|---|
type | De typeeigenschap van de gegevensset moet worden ingesteld op: MongoDbV2Collection | Ja |
collectionName | De naam van de verzameling in de MongoDB-database. | Ja |
Voorbeeld:
{
"name": "MongoDbDataset",
"properties": {
"type": "MongoDbV2Collection",
"typeProperties": {
"collectionName": "<Collection name>"
},
"schema": [],
"linkedServiceName": {
"referenceName": "<MongoDB linked service name>",
"type": "LinkedServiceReference"
}
}
}
Eigenschappen van de kopieeractiviteit
Zie het artikel Pijplijnen voor een volledige lijst met secties en eigenschappen die beschikbaar zijn voor het definiëren van activiteiten. Deze sectie bevat een lijst met eigenschappen die worden ondersteund door mongoDB-bron en -sink.
MongoDB als bron
De volgende eigenschappen worden ondersteund in de sectie bron van kopieeractiviteit:
Eigenschappen | Beschrijving | Vereist |
---|---|---|
type | De typeeigenschap van de bron van de kopieeractiviteit moet worden ingesteld op: MongoDbV2Source | Ja |
filter | Hiermee geeft u selectiefilter op met behulp van queryoperators. Als u alle documenten in een verzameling wilt retourneren, laat u deze parameter weg of geeft u een leeg document ({}) door. | Nee |
cursorMethods.project | Hiermee geeft u de velden die moeten worden geretourneerd in de documenten voor projectie. Als u alle velden in de overeenkomende documenten wilt retourneren, laat u deze parameter weg. | Nee |
cursorMethods.sort | Hiermee geeft u de volgorde op waarin de query overeenkomende documenten retourneert. Raadpleeg cursor.sort(). | Nee |
cursorMethods.limit | Hiermee geeft u het maximum aantal documenten dat de server retourneert. Raadpleeg cursor.limit(). | Nee |
cursorMethods.skip | Hiermee geeft u het aantal documenten dat moet worden overgeslagen en van waaruit MongoDB begint met het retourneren van resultaten. Raadpleeg cursor.skip(). | Nee |
batchSize | Hiermee geeft u het aantal documenten op dat moet worden geretourneerd in elke batch van het antwoord van het MongoDB-exemplaar. In de meeste gevallen heeft het wijzigen van de batchgrootte geen invloed op de gebruiker of de toepassing. Azure Cosmos DB beperkt elke batch mag niet groter zijn dan 40 MB. Dit is de som van de grootte van het batchSize-aantal documenten, dus verklein deze waarde als uw documentgrootte groot is. | Nee (de standaardwaarde is 100) |
Tip
De service biedt ondersteuning voor het verbruik van BSON-documenten in de strikte modus. Zorg ervoor dat uw filterquery zich in de strikte modus bevindt in plaats van de Shell-modus. Meer beschrijving vindt u in de Handleiding van MongoDB.
Voorbeeld:
"activities":[
{
"name": "CopyFromMongoDB",
"type": "Copy",
"inputs": [
{
"referenceName": "<MongoDB input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "MongoDbV2Source",
"filter": "{datetimeData: {$gte: ISODate(\"2018-12-11T00:00:00.000Z\"),$lt: ISODate(\"2018-12-12T00:00:00.000Z\")}, _id: ObjectId(\"5acd7c3d0000000000000000\") }",
"cursorMethods": {
"project": "{ _id : 1, name : 1, age: 1, datetimeData: 1 }",
"sort": "{ age : 1 }",
"skip": 3,
"limit": 3
}
},
"sink": {
"type": "<sink type>"
}
}
}
]
MongoDB als sink
De volgende eigenschappen worden ondersteund in de sectie Sink voor kopieeractiviteit:
Eigenschappen | Beschrijving | Vereist |
---|---|---|
type | De typeeigenschap van de sink voor kopieeractiviteit moet worden ingesteld op MongoDbV2Sink. | Ja |
writeBehavior | Hierin wordt beschreven hoe u gegevens naar MongoDB schrijft. Toegestane waarden: invoegen en upsert. Het gedrag van upsert is om het document te vervangen als er al een document bestaat _id ; anders voegt u het document in.Opmerking: De service genereert automatisch een _id voor een document als een _id document niet is opgegeven in het oorspronkelijke document of door kolomtoewijzing. Dit betekent dat u ervoor moet zorgen dat uw document een id heeft, zodat upsert werkt zoals verwacht. |
Nee (de standaardwaarde is invoegen) |
writeBatchSize | De eigenschap writeBatchSize bepaalt de grootte van documenten die in elke batch moeten worden geschreven. U kunt proberen de waarde voor writeBatchSize te verhogen om de prestaties te verbeteren en de waarde te verlagen als uw documentgrootte groot is. | Nee (de standaardwaarde is 10.000) |
writeBatchTimeout | De wachttijd voordat de batchinvoegbewerking is voltooid voordat er een time-out optreedt. De toegestane waarde is tijdspanne. | Nee (de standaardwaarde is 00:30:00 - 30 minuten) |
Tip
Als u JSON-documenten als zodanig wilt importeren, raadpleegt u de sectie JSON-documenten importeren of exporteren. Als u gegevens in tabelvorm wilt kopiëren, raadpleegt u Schematoewijzing.
Voorbeeld
"activities":[
{
"name": "CopyToMongoDB",
"type": "Copy",
"inputs": [
{
"referenceName": "<input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<Document DB output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "<source type>"
},
"sink": {
"type": "MongoDbV2Sink",
"writeBehavior": "upsert"
}
}
}
]
JSON-documenten importeren en exporteren
U kunt deze MongoDB-connector gebruiken om eenvoudig het volgende te doen:
- Documenten kopiëren tussen twee MongoDB-verzamelingen.
- Importeer JSON-documenten uit verschillende bronnen naar MongoDB, waaronder van Azure Cosmos DB, Azure Blob Storage, Azure Data Lake Store en andere ondersteunde opslag op basis van bestanden.
- Exporteer JSON-documenten uit een MongoDB-verzameling naar verschillende bestandsarchieven.
Als u een dergelijke schemaagnostische kopie wilt bereiken, slaat u de sectie 'structuur' (ook wel schema genoemd) over in de gegevensset en schematoewijzing in de kopieeractiviteit.
Schematoewijzing
Als u gegevens van MongoDB wilt kopiëren naar een tabelvormige sink of omgekeerd, raadpleegt u schematoewijzing.
De gekoppelde MongoDB-service upgraden
Hier volgen stappen waarmee u uw gekoppelde service en gerelateerde query's kunt upgraden:
Maak een nieuwe gekoppelde MongoDB-service en configureer deze door te verwijzen naar de eigenschappen van de gekoppelde service.
Als u SQL-query's in uw pijplijnen gebruikt die verwijzen naar de oude gekoppelde MongoDB-service, vervangt u deze door de equivalente MongoDB-query's. Zie de volgende tabel voor de vervangingsvoorbeelden:
SQL-query Equivalente MongoDB-query SELECT * FROM users
db.users.find({})
SELECT username, age FROM users
db.users.find({}, {username: 1, age: 1})
SELECT username AS User, age AS Age, statusNumber AS Status, CASE WHEN Status = 0 THEN "Pending" CASE WHEN Status = 1 THEN "Finished" ELSE "Unknown" END AS statusEnum LastUpdatedTime + interval '2' hour AS NewLastUpdatedTime FROM users
db.users.aggregate([{ $project: { _id: 0, User: "$username", Age: "$age", Status: "$statusNumber", statusEnum: { $switch: { branches: [ { case: { $eq: ["$Status", 0] }, then: "Pending" }, { case: { $eq: ["$Status", 1] }, then: "Finished" } ], default: "Unknown" } }, NewLastUpdatedTime: { $add: ["$LastUpdatedTime", 2 * 60 * 60 * 1000] } } }])
SELECT employees.name, departments.name AS department_name FROM employees LEFT JOIN departments ON employees.department_id = departments.id;
db.employees.aggregate([ { $lookup: { from: "departments", localField: "department_id", foreignField: "_id", as: "department" } }, { $unwind: "$department" }, { $project: { _id: 0, name: 1, department_name: "$department.name" } } ])
Gerelateerde inhoud
Zie ondersteunde gegevensarchieven voor een lijst met gegevensarchieven die worden ondersteund als bronnen en sinks door de kopieeractiviteit.