Udostępnij za pośrednictwem


Kopiowanie danych z lub do bazy danych MongoDB przy użyciu usługi Azure Data Factory lub Synapse Analytics

DOTYCZY: Azure Data Factory Azure Synapse Analytics

Napiwek

Wypróbuj usługę Data Factory w usłudze Microsoft Fabric — rozwiązanie analityczne typu all-in-one dla przedsiębiorstw. Usługa Microsoft Fabric obejmuje wszystko, od przenoszenia danych do nauki o danych, analizy w czasie rzeczywistym, analizy biznesowej i raportowania. Dowiedz się, jak bezpłatnie rozpocząć nową wersję próbną !

W tym artykule opisano sposób używania działania kopiowania w potokach usługi Azure Data Factory Synapse Analytics do kopiowania danych z bazy danych MongoDB i do bazy danych MongoDB. Jest on oparty na artykule omówienie działania kopiowania, który przedstawia ogólne omówienie działania kopiowania.

Ważne

Nowy łącznik bazy danych MongoDB zapewnia ulepszoną natywną obsługę bazy danych MongoDB. Jeśli używasz starszego łącznika bazy danych MongoDB w swoim rozwiązaniu, obsługiwanej zgodnie ze zgodnością z poprzednimi wersjami, zapoznaj się z artykułem Dotyczącym łącznika bazy danych MongoDB (starsza wersja).

Obsługiwane możliwości

Ten łącznik bazy danych MongoDB jest obsługiwany w przypadku następujących możliwości:

Obsługiwane możliwości IR
działanie Kopiuj (źródło/ujście) (1) (2)

(1) Środowisko Azure Integration Runtime (2) Self-hosted Integration Runtime

Aby uzyskać listę magazynów danych obsługiwanych jako źródła/ujścia, zobacz tabelę Obsługiwane magazyny danych.

W szczególności ten łącznik bazy danych MongoDB obsługuje wersje do 4.2. Jeśli twoja praca wymaga nowszych wersji niż 4.2, rozważ użycie usługi MongoDB Atlas z łącznikiem Usługi MongoDB Atlas, który zapewnia bardziej kompleksową obsługę i funkcje.

Wymagania wstępne

Jeśli magazyn danych znajduje się wewnątrz sieci lokalnej, sieci wirtualnej platformy Azure lub chmury prywatnej Amazon Virtual, musisz skonfigurować własne środowisko Integration Runtime , aby się z nim połączyć.

Jeśli magazyn danych jest zarządzaną usługą danych w chmurze, możesz użyć środowiska Azure Integration Runtime. Jeśli dostęp jest ograniczony do adresów IP zatwierdzonych w regułach zapory, możesz dodać adresy IP środowiska Azure Integration Runtime do listy dozwolonych.

Możesz również użyć funkcji środowiska Integration Runtime zarządzanej sieci wirtualnej w usłudze Azure Data Factory, aby uzyskać dostęp do sieci lokalnej bez instalowania i konfigurowania własnego środowiska Integration Runtime.

Aby uzyskać więcej informacji na temat mechanizmów zabezpieczeń sieci i opcji obsługiwanych przez usługę Data Factory, zobacz Strategie dostępu do danych.

Wprowadzenie

Aby wykonać działanie Kopiuj za pomocą potoku, możesz użyć jednego z następujących narzędzi lub zestawów SDK:

Tworzenie połączonej usługi z bazą danych MongoDB przy użyciu interfejsu użytkownika

Wykonaj poniższe kroki, aby utworzyć połączoną usługę z bazą danych MongoDB w interfejsie użytkownika witryny Azure Portal.

  1. Przejdź do karty Zarządzanie w obszarze roboczym usługi Azure Data Factory lub Synapse i wybierz pozycję Połączone usługi, a następnie kliknij pozycję Nowy:

  2. Wyszukaj pozycję MongoDB i wybierz łącznik MongoDB.

    Wybierz łącznik MongoDB.

  3. Skonfiguruj szczegóły usługi, przetestuj połączenie i utwórz nową połączoną usługę.

    Skonfiguruj połączoną usługę z bazą danych MongoDB.

Szczegóły konfiguracji łącznika

Poniższe sekcje zawierają szczegółowe informacje o właściwościach używanych do definiowania jednostek usługi Data Factory specyficznych dla łącznika bazy danych MongoDB.

Właściwości połączonej usługi

Następujące właściwości są obsługiwane w przypadku połączonej usługi MongoDB:

Właściwości Opis Wymagania
type Właściwość type musi być ustawiona na: MongoDbV2 Tak
Parametry połączenia Określ parametry połączenia mongoDB, np. mongodb://[username:password@]host[:port][/[database][?options]]. Aby uzyskać więcej informacji, zapoznaj się z instrukcjami dotyczącymi bazy danych MongoDB w parametry połączenia.

Możesz również umieścić parametry połączenia w usłudze Azure Key Vault. Aby uzyskać więcej informacji, zobacz Przechowywanie poświadczeń w usłudze Azure Key Vault .
Tak
database Nazwa bazy danych, do której chcesz uzyskać dostęp. Tak
connectVia Środowisko Integration Runtime do nawiązania połączenia z magazynem danych. Dowiedz się więcej w sekcji Wymagania wstępne . Jeśli nie zostanie określony, używa domyślnego środowiska Azure Integration Runtime. Nie.

Przykład:

{
    "name": "MongoDBLinkedService",
    "properties": {
        "type": "MongoDbV2",
        "typeProperties": {
            "connectionString": "mongodb://[username:password@]host[:port][/[database][?options]]",
            "database": "myDatabase"
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

Właściwości zestawu danych

Aby uzyskać pełną listę sekcji i właściwości dostępnych do definiowania zestawów danych, zobacz Zestawy danych i połączone usługi. Następujące właściwości są obsługiwane w przypadku zestawu danych MongoDB:

Właściwości Opis Wymagania
type Właściwość type zestawu danych musi być ustawiona na: MongoDbV2Collection Tak
collectionName Nazwa kolekcji w bazie danych MongoDB. Tak

Przykład:

{
    "name": "MongoDbDataset",
    "properties": {
        "type": "MongoDbV2Collection",
        "typeProperties": {
            "collectionName": "<Collection name>"
        },
        "schema": [],
        "linkedServiceName": {
            "referenceName": "<MongoDB linked service name>",
            "type": "LinkedServiceReference"
        }
    }
}

Właściwości działania kopiowania

Aby uzyskać pełną listę sekcji i właściwości dostępnych do definiowania działań, zobacz artykuł Pipelines (Potoki ). Ta sekcja zawiera listę właściwości obsługiwanych przez źródło i ujście bazy danych MongoDB.

MongoDB jako źródło

Następujące właściwości są obsługiwane w sekcji źródło działania kopiowania:

Właściwości Opis Wymagania
type Właściwość type źródła działania kopiowania musi być ustawiona na: MongoDbV2Source Tak
filtr Określa filtr wyboru przy użyciu operatorów zapytań. Aby zwrócić wszystkie dokumenty w kolekcji, pomiń ten parametr lub przekaż pusty dokument ({}). Nie.
cursorMethods.project Określa pola, które mają być zwracane w dokumentach na potrzeby projekcji. Aby zwrócić wszystkie pola w pasujących dokumentach, pomiń ten parametr. Nie.
cursorMethods.sort Określa kolejność, w której zapytanie zwraca pasujące dokumenty. Zapoznaj się z tematem cursor.sort(). Nie.
cursorMethods.limit Określa maksymalną liczbę dokumentów zwracanych przez serwer. Zapoznaj się z tematem cursor.limit(). Nie.
cursorMethods.skip Określa liczbę dokumentów do pominięcia i od miejsca, w którym baza MongoDB zaczyna zwracać wyniki. Zapoznaj się z elementem cursor.skip(). Nie.
batchSize Określa liczbę dokumentów, które mają być zwracane w każdej partii odpowiedzi z wystąpienia bazy danych MongoDB. W większości przypadków modyfikowanie rozmiaru partii nie wpłynie na użytkownika ani aplikację. Usługa Azure Cosmos DB ogranicza rozmiar każdej partii nie może przekraczać 40 MB rozmiaru, czyli sumy rozmiaru wsadowego Rozmiaru dokumentów, więc zmniejsz tę wartość, jeśli rozmiar dokumentu jest duży. Nie.
(wartość domyślna to 100)

Napiwek

Usługa obsługuje korzystanie z dokumentu BSON w trybie ścisłym. Upewnij się, że zapytanie filtru jest w trybie ścisłym zamiast w trybie powłoki. Więcej informacji można znaleźć w podręczniku bazy danych MongoDB.

Przykład:

"activities":[
    {
        "name": "CopyFromMongoDB",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<MongoDB input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "MongoDbV2Source",
                "filter": "{datetimeData: {$gte: ISODate(\"2018-12-11T00:00:00.000Z\"),$lt: ISODate(\"2018-12-12T00:00:00.000Z\")}, _id: ObjectId(\"5acd7c3d0000000000000000\") }",
                "cursorMethods": {
                    "project": "{ _id : 1, name : 1, age: 1, datetimeData: 1 }",
                    "sort": "{ age : 1 }",
                    "skip": 3,
                    "limit": 3
                }
            },
            "sink": {
                "type": "<sink type>"
            }
        }
    }
]

MongoDB jako ujście

Następujące właściwości są obsługiwane w sekcji ujścia działania kopiowania:

Właściwości Opis Wymagania
type Właściwość type ujścia działania kopiowania musi być ustawiona na MongoDbV2Sink. Tak
writeBehavior Opisuje sposób zapisywania danych w bazie danych MongoDB. Dozwolone wartości: wstawianie i upsert.

Zachowanie operacji upsert polega na zastąpieniu dokumentu, jeśli dokument o tym samym _id już istnieje; w przeciwnym razie wstaw dokument.

Uwaga: usługa automatycznie generuje _id element dla dokumentu, jeśli _id element nie został określony w oryginalnym dokumencie lub według mapowania kolumn. Oznacza to, że należy upewnić się, że aby operacja upsert działała zgodnie z oczekiwaniami, dokument ma identyfikator.
Nie.
(wartość domyślna to wstawianie)
writeBatchSize Właściwość writeBatchSize kontroluje rozmiar dokumentów do zapisu w każdej partii. Możesz spróbować zwiększyć wartość parametru writeBatchSize , aby zwiększyć wydajność i zmniejszyć wartość, jeśli rozmiar dokumentu jest duży. Nie.
(wartość domyślna to 10 000)
writeBatchTimeout Czas oczekiwania na zakończenie operacji wstawiania wsadowego przed upływem limitu czasu. Dozwolona wartość to przedział czasu. Nie.
(wartość domyślna to 00:30:00 –30 minut)

Napiwek

Aby zaimportować dokumenty JSON zgodnie z rzeczywistym użyciem, zapoznaj się z sekcją Importowanie lub eksportowanie dokumentów JSON. Aby skopiować z danych w kształcie tabelarycznego, zapoznaj się z mapowaniem schematu.

Przykład

"activities":[
    {
        "name": "CopyToMongoDB",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<Document DB output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "<source type>"
            },
            "sink": {
                "type": "MongoDbV2Sink",
                "writeBehavior": "upsert"
            }
        }
    }
]

Importowanie i eksportowanie dokumentów JSON

Możesz użyć tego łącznika bazy danych MongoDB, aby łatwo:

  • Kopiowanie dokumentów między dwiema kolekcjami bazy danych MongoDB w następujący sposób.
  • Zaimportuj dokumenty JSON z różnych źródeł do bazy danych MongoDB, w tym z usług Azure Cosmos DB, Azure Blob Storage, Azure Data Lake Store i innych obsługiwanych magazynów opartych na plikach.
  • Eksportowanie dokumentów JSON z kolekcji bazy danych MongoDB do różnych magazynów opartych na plikach.

Aby uzyskać taką niezależną kopię schematu, pomiń sekcję "struktura" (nazywaną również schematem) w zestawie danych i mapowaniu schematu w działaniu kopiowania.

Mapowanie schematu

Aby skopiować dane z bazy danych MongoDB do ujścia tabelarycznego lub odwróconego, zapoznaj się z mapowaniem schematu.

Uaktualnianie połączonej usługi MongoDB

Poniżej przedstawiono kroki ułatwiające uaktualnienie połączonej usługi i powiązanych zapytań:

  1. Utwórz nową połączoną usługę MongoDB i skonfiguruj ją, odwołując się do właściwości połączonej usługi.

  2. Jeśli używasz zapytań SQL w potokach odwołujących się do starej połączonej usługi MongoDB, zastąp je równoważnymi zapytaniami bazy danych MongoDB. Zapoznaj się z poniższą tabelą, aby zapoznać się z przykładami zastępczymi:

    Zapytanie SQL Równoważne zapytanie bazy danych MongoDB
    SELECT * FROM users db.users.find({})
    SELECT username, age FROM users db.users.find({}, {username: 1, age: 1})
    SELECT username AS User, age AS Age, statusNumber AS Status, CASE WHEN Status = 0 THEN "Pending" CASE WHEN Status = 1 THEN "Finished" ELSE "Unknown" END AS statusEnum LastUpdatedTime + interval '2' hour AS NewLastUpdatedTime FROM users db.users.aggregate([{ $project: { _id: 0, User: "$username", Age: "$age", Status: "$statusNumber", statusEnum: { $switch: { branches: [ { case: { $eq: ["$Status", 0] }, then: "Pending" }, { case: { $eq: ["$Status", 1] }, then: "Finished" } ], default: "Unknown" } }, NewLastUpdatedTime: { $add: ["$LastUpdatedTime", 2 * 60 * 60 * 1000] } } }])
    SELECT employees.name, departments.name AS department_name FROM employees LEFT JOIN departments ON employees.department_id = departments.id; db.employees.aggregate([ { $lookup: { from: "departments", localField: "department_id", foreignField: "_id", as: "department" } }, { $unwind: "$department" }, { $project: { _id: 0, name: 1, department_name: "$department.name" } } ])

Aby uzyskać listę magazynów danych obsługiwanych jako źródła i ujścia działania kopiowania, zobacz obsługiwane magazyny danych.