Delen via


Gegevens kopiëren van of naar MongoDB met behulp van Azure Data Factory of Synapse Analytics

VAN TOEPASSING OP: Azure Data Factory Azure Synapse Analytics

Tip

Probeer Data Factory uit in Microsoft Fabric, een alles-in-één analyseoplossing voor ondernemingen. Microsoft Fabric omvat alles, van gegevensverplaatsing tot gegevenswetenschap, realtime analyses, business intelligence en rapportage. Meer informatie over het gratis starten van een nieuwe proefversie .

In dit artikel wordt beschreven hoe u de kopieeractiviteit in Azure Data Factory Synapse Analytics-pijplijnen gebruikt om gegevens van en naar een MongoDB-database te kopiëren. Het is gebaseerd op het artikel over het overzicht van kopieeractiviteiten met een algemeen overzicht van de kopieeractiviteit.

Belangrijk

De nieuwe MongoDB-connector biedt verbeterde systeemeigen MongoDB-ondersteuning. Als u de verouderde MongoDB-connector in uw oplossing gebruikt, die alleen wordt ondersteund voor compatibiliteit met eerdere versies, raadpleegt u het artikel mongoDB-connector (verouderd ).

Ondersteunde mogelijkheden

Deze MongoDB-connector wordt ondersteund voor de volgende mogelijkheden:

Ondersteunde mogelijkheden IR
Copy-activiteit (bron/sink) (1) (2)

(1) Azure Integration Runtime (2) Zelf-hostende Integration Runtime

Zie de tabel Ondersteunde gegevensarchieven voor een lijst met gegevensarchieven die worden ondersteund als bronnen/sinks.

Deze MongoDB-connector ondersteunt met name versies tot 4.2. Als voor uw werk nieuwere versies dan 4.2 zijn vereist, kunt u MongoDB Atlas gebruiken met de MongoDB Atlas-connector, die uitgebreidere ondersteuning en functies biedt.

Vereisten

Als uw gegevensarchief zich in een on-premises netwerk, een virtueel Azure-netwerk of een virtuele particuliere cloud van Amazon bevindt, moet u een zelf-hostende Integration Runtime configureren om er verbinding mee te maken.

Als uw gegevensarchief een beheerde cloudgegevensservice is, kunt u De Azure Integration Runtime gebruiken. Als de toegang is beperkt tot IP-adressen die zijn goedgekeurd in de firewallregels, kunt u IP-adressen van Azure Integration Runtime toevoegen aan de acceptatielijst.

U kunt ook de beheerde functie voor integratieruntime voor virtuele netwerken in Azure Data Factory gebruiken om toegang te krijgen tot het on-premises netwerk zonder een zelf-hostende Integration Runtime te installeren en te configureren.

Zie Strategieën voor gegevenstoegang voor meer informatie over de netwerkbeveiligingsmechanismen en -opties die door Data Factory worden ondersteund.

Aan de slag

Als u de kopieeractiviteit wilt uitvoeren met een pijplijn, kunt u een van de volgende hulpprogramma's of SDK's gebruiken:

Een gekoppelde service maken voor MongoDB met behulp van de gebruikersinterface

Gebruik de volgende stappen om een gekoppelde service te maken voor MongoDB in de gebruikersinterface van Azure Portal.

  1. Blader naar het tabblad Beheren in uw Azure Data Factory- of Synapse-werkruimte en selecteer Gekoppelde services en klik vervolgens op Nieuw:

  2. Zoek naar MongoDB en selecteer de MongoDB-connector.

    Selecteer de MongoDB-connector.

  3. Configureer de servicedetails, test de verbinding en maak de nieuwe gekoppelde service.

    Configureer een gekoppelde service voor MongoDB.

Configuratiedetails van connector

De volgende secties bevatten details over eigenschappen die worden gebruikt om Data Factory-entiteiten te definiëren die specifiek zijn voor MongoDB-connector.

Eigenschappen van gekoppelde service

De volgende eigenschappen worden ondersteund voor de gekoppelde MongoDB-service:

Eigenschappen Beschrijving Vereist
type De typeeigenschap moet worden ingesteld op: MongoDbV2 Ja
connectionString Geef de MongoDB-verbindingsreeks bijvoorbeeld mongodb://[username:password@]host[:port][/[database][?options]]op. Raadpleeg de MongoDB-handleiding voor verbindingsreeks voor meer informatie.

U kunt ook een verbindingsreeks in Azure Key Vault plaatsen. Raadpleeg De referenties van de Store in Azure Key Vault met meer informatie.
Ja
database De naam van de database waartoe u toegang wilt hebben. Ja
connectVia De Integration Runtime die moet worden gebruikt om verbinding te maken met het gegevensarchief. Meer informatie vindt u in de sectie Vereisten . Als dit niet is opgegeven, wordt de standaard Azure Integration Runtime gebruikt. Nee

Voorbeeld:

{
    "name": "MongoDBLinkedService",
    "properties": {
        "type": "MongoDbV2",
        "typeProperties": {
            "connectionString": "mongodb://[username:password@]host[:port][/[database][?options]]",
            "database": "myDatabase"
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

Eigenschappen van gegevensset

Zie Gegevenssets en gekoppelde services voor een volledige lijst met secties en eigenschappen die beschikbaar zijn voor het definiëren van gegevenssets. De volgende eigenschappen worden ondersteund voor mongoDB-gegevensset:

Eigenschappen Beschrijving Vereist
type De typeeigenschap van de gegevensset moet worden ingesteld op: MongoDbV2Collection Ja
collectionName De naam van de verzameling in de MongoDB-database. Ja

Voorbeeld:

{
    "name": "MongoDbDataset",
    "properties": {
        "type": "MongoDbV2Collection",
        "typeProperties": {
            "collectionName": "<Collection name>"
        },
        "schema": [],
        "linkedServiceName": {
            "referenceName": "<MongoDB linked service name>",
            "type": "LinkedServiceReference"
        }
    }
}

Eigenschappen van de kopieeractiviteit

Zie het artikel Pijplijnen voor een volledige lijst met secties en eigenschappen die beschikbaar zijn voor het definiëren van activiteiten. Deze sectie bevat een lijst met eigenschappen die worden ondersteund door mongoDB-bron en -sink.

MongoDB als bron

De volgende eigenschappen worden ondersteund in de sectie bron van kopieeractiviteit:

Eigenschappen Beschrijving Vereist
type De typeeigenschap van de bron van de kopieeractiviteit moet worden ingesteld op: MongoDbV2Source Ja
filter Hiermee geeft u selectiefilter op met behulp van queryoperators. Als u alle documenten in een verzameling wilt retourneren, laat u deze parameter weg of geeft u een leeg document ({}) door. Nee
cursorMethods.project Hiermee geeft u de velden die moeten worden geretourneerd in de documenten voor projectie. Als u alle velden in de overeenkomende documenten wilt retourneren, laat u deze parameter weg. Nee
cursorMethods.sort Hiermee geeft u de volgorde op waarin de query overeenkomende documenten retourneert. Raadpleeg cursor.sort(). Nee
cursorMethods.limit Hiermee geeft u het maximum aantal documenten dat de server retourneert. Raadpleeg cursor.limit(). Nee
cursorMethods.skip Hiermee geeft u het aantal documenten dat moet worden overgeslagen en van waaruit MongoDB begint met het retourneren van resultaten. Raadpleeg cursor.skip(). Nee
batchSize Hiermee geeft u het aantal documenten op dat moet worden geretourneerd in elke batch van het antwoord van het MongoDB-exemplaar. In de meeste gevallen heeft het wijzigen van de batchgrootte geen invloed op de gebruiker of de toepassing. Azure Cosmos DB beperkt elke batch mag niet groter zijn dan 40 MB. Dit is de som van de grootte van het batchSize-aantal documenten, dus verklein deze waarde als uw documentgrootte groot is. Nee
(de standaardwaarde is 100)

Tip

De service biedt ondersteuning voor het verbruik van BSON-documenten in de strikte modus. Zorg ervoor dat uw filterquery zich in de strikte modus bevindt in plaats van de Shell-modus. Meer beschrijving vindt u in de Handleiding van MongoDB.

Voorbeeld:

"activities":[
    {
        "name": "CopyFromMongoDB",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<MongoDB input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "MongoDbV2Source",
                "filter": "{datetimeData: {$gte: ISODate(\"2018-12-11T00:00:00.000Z\"),$lt: ISODate(\"2018-12-12T00:00:00.000Z\")}, _id: ObjectId(\"5acd7c3d0000000000000000\") }",
                "cursorMethods": {
                    "project": "{ _id : 1, name : 1, age: 1, datetimeData: 1 }",
                    "sort": "{ age : 1 }",
                    "skip": 3,
                    "limit": 3
                }
            },
            "sink": {
                "type": "<sink type>"
            }
        }
    }
]

MongoDB als sink

De volgende eigenschappen worden ondersteund in de sectie Sink voor kopieeractiviteit:

Eigenschappen Beschrijving Vereist
type De typeeigenschap van de sink voor kopieeractiviteit moet worden ingesteld op MongoDbV2Sink. Ja
writeBehavior Hierin wordt beschreven hoe u gegevens naar MongoDB schrijft. Toegestane waarden: invoegen en upsert.

Het gedrag van upsert is om het document te vervangen als er al een document bestaat _id ; anders voegt u het document in.

Opmerking: De service genereert automatisch een _id voor een document als een _id document niet is opgegeven in het oorspronkelijke document of door kolomtoewijzing. Dit betekent dat u ervoor moet zorgen dat uw document een id heeft, zodat upsert werkt zoals verwacht.
Nee
(de standaardwaarde is invoegen)
writeBatchSize De eigenschap writeBatchSize bepaalt de grootte van documenten die in elke batch moeten worden geschreven. U kunt proberen de waarde voor writeBatchSize te verhogen om de prestaties te verbeteren en de waarde te verlagen als uw documentgrootte groot is. Nee
(de standaardwaarde is 10.000)
writeBatchTimeout De wachttijd voordat de batchinvoegbewerking is voltooid voordat er een time-out optreedt. De toegestane waarde is tijdspanne. Nee
(de standaardwaarde is 00:30:00 - 30 minuten)

Tip

Als u JSON-documenten als zodanig wilt importeren, raadpleegt u de sectie JSON-documenten importeren of exporteren. Als u gegevens in tabelvorm wilt kopiëren, raadpleegt u Schematoewijzing.

Voorbeeld

"activities":[
    {
        "name": "CopyToMongoDB",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<Document DB output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "<source type>"
            },
            "sink": {
                "type": "MongoDbV2Sink",
                "writeBehavior": "upsert"
            }
        }
    }
]

JSON-documenten importeren en exporteren

U kunt deze MongoDB-connector gebruiken om eenvoudig het volgende te doen:

  • Documenten kopiëren tussen twee MongoDB-verzamelingen.
  • Importeer JSON-documenten uit verschillende bronnen naar MongoDB, waaronder van Azure Cosmos DB, Azure Blob Storage, Azure Data Lake Store en andere ondersteunde opslag op basis van bestanden.
  • Exporteer JSON-documenten uit een MongoDB-verzameling naar verschillende bestandsarchieven.

Als u een dergelijke schemaagnostische kopie wilt bereiken, slaat u de sectie 'structuur' (ook wel schema genoemd) over in de gegevensset en schematoewijzing in de kopieeractiviteit.

Schematoewijzing

Als u gegevens van MongoDB wilt kopiëren naar een tabelvormige sink of omgekeerd, raadpleegt u schematoewijzing.

De gekoppelde MongoDB-service upgraden

Hier volgen stappen waarmee u uw gekoppelde service en gerelateerde query's kunt upgraden:

  1. Maak een nieuwe gekoppelde MongoDB-service en configureer deze door te verwijzen naar de eigenschappen van de gekoppelde service.

  2. Als u SQL-query's in uw pijplijnen gebruikt die verwijzen naar de oude gekoppelde MongoDB-service, vervangt u deze door de equivalente MongoDB-query's. Zie de volgende tabel voor de vervangingsvoorbeelden:

    SQL-query Equivalente MongoDB-query
    SELECT * FROM users db.users.find({})
    SELECT username, age FROM users db.users.find({}, {username: 1, age: 1})
    SELECT username AS User, age AS Age, statusNumber AS Status, CASE WHEN Status = 0 THEN "Pending" CASE WHEN Status = 1 THEN "Finished" ELSE "Unknown" END AS statusEnum LastUpdatedTime + interval '2' hour AS NewLastUpdatedTime FROM users db.users.aggregate([{ $project: { _id: 0, User: "$username", Age: "$age", Status: "$statusNumber", statusEnum: { $switch: { branches: [ { case: { $eq: ["$Status", 0] }, then: "Pending" }, { case: { $eq: ["$Status", 1] }, then: "Finished" } ], default: "Unknown" } }, NewLastUpdatedTime: { $add: ["$LastUpdatedTime", 2 * 60 * 60 * 1000] } } }])
    SELECT employees.name, departments.name AS department_name FROM employees LEFT JOIN departments ON employees.department_id = departments.id; db.employees.aggregate([ { $lookup: { from: "departments", localField: "department_id", foreignField: "_id", as: "department" } }, { $unwind: "$department" }, { $project: { _id: 0, name: 1, department_name: "$department.name" } } ])

Zie ondersteunde gegevensarchieven voor een lijst met gegevensarchieven die worden ondersteund als bronnen en sinks door de kopieeractiviteit.