Sdílet prostřednictvím


Klientská knihovna služby Azure Event Hubs pro JavaScript – verze 5.12.1

Azure Event Hubs je vysoce škálovatelná služba publikování a odběru, která dokáže ingestovat miliony událostí za sekundu a streamovat je více příjemcům. Díky tomu můžete zpracovávat a analyzovat obrovské objemy dat vytvořených připojenými zařízeními a aplikacemi. Pokud se chcete dozvědět více o službě Azure Event Hubs, můžete si projít: Co je služba Event Hubs?

Klientská knihovna Azure Event Hubs umožňuje odesílat a přijímat události ve vaší Node.js aplikaci.

Klíčové odkazy:

  • zdrojového kódu
  • balíčku (npm)
  • Referenční dokumentace k rozhraní API
  • dokumentace k produktu
  • ukázky

POZNÁMKA: Pokud používáte verzi 2.1.0 nebo nižší a chcete migrovat na nejnovější verzi tohoto balíčku, projděte si našeho průvodce migrací a přejděte z EventHubs V2 na EventHubs V5.

Ukázky pro v2 a dokumentaci jsou stále k dispozici tady:

zdrojový kód pro balíček | v2.1.0 (npm) | pro verze 2.1.0

Začínáme

Instalace balíčku

Instalace klientské knihovny Azure Event Hubs pomocí npm

npm install @azure/event-hubs

Aktuálně podporovaná prostředí

Další podrobnosti najdete v našich zásadách podpory .

Požadavky

  • Předplatné Azure
  • oboru názvů služby Event Hubs

Konfigurace TypeScriptu

Uživatelé TypeScriptu musí mít nainstalované definice typu Node:

npm install @types/node

Musíte také povolit compilerOptions.allowSyntheticDefaultImports ve svém tsconfig.json. Všimněte si, že pokud jste povolili compilerOptions.esModuleInterop, allowSyntheticDefaultImports je ve výchozím nastavení povolená. Další informace najdete v příručce možnosti kompilátoru TypeScriptu.

JavaScript Bundle

Pokud chcete tuto klientskou knihovnu použít v prohlížeči, musíte nejprve použít bundler. Podrobnosti o tom, jak to udělat, najdete v naší dokumentaci sdružování.

Kromě toho, co je zde popsáno, tato knihovna také potřebuje další polyfilly pro následující integrované moduly NodeJS, aby fungovala správně v prohlížečích:

  • buffer
  • os
  • path
  • process

Sdružování s Webpackem

Pokud používáte Webpack v5, můžete nainstalovat následující vývojové závislosti.

  • npm install --save-dev os-browserify path-browserify

pak do svého webpack.config.js přidejte následující:

 const path = require("path");
+const webpack = require("webpack");

 module.exports = {
   entry: "./src/index.ts",
@@ -12,8 +13,21 @@ module.exports = {
       },
     ],
   },
+  plugins: [
+    new webpack.ProvidePlugin({
+      process: "process/browser",
+    }),
+    new webpack.ProvidePlugin({
+      Buffer: ["buffer", "Buffer"],
+    }),
+  ],
   resolve: {
     extensions: [".ts", ".js"],
+    fallback: {
+      buffer: require.resolve("buffer/"),
+      os: require.resolve("os-browserify"),
+      path: require.resolve("path-browserify"),
+    },
   },

Sdružování s kumulativními funkcemi

Pokud používáte rollup bundler, nainstalujte následující vývojové závislosti.

  • npm install --save-dev @rollup/plugin-commonjs @rollup/plugin-inject @rollup/plugin-node-resolve

Pak do svého rollup.config.js zahrňte následující:

+import nodeResolve from "@rollup/plugin-node-resolve";
+import cjs from "@rollup/plugin-commonjs";
+import shim from "rollup-plugin-shim";
+import inject from "@rollup/plugin-inject";

export default {
  // other configs
  plugins: [
+    shim({
+      fs: `export default {}`,
+      net: `export default {}`,
+      tls: `export default {}`,
+      path: `export default {}`,
+      dns: `export function resolve() { }`,
+    }),
+    nodeResolve({
+      mainFields: ["module", "browser"],
+      preferBuiltins: false,
+    }),
+    cjs(),
+    inject({
+      modules: {
+        Buffer: ["buffer", "Buffer"],
+        process: "process",
+      },
+      exclude: ["./**/package.json"],
+    }),
  ]
};

Další informace o používání polyfills najdete v dokumentaci k vašemu oblíbenému bundleru.

Nativní podpora Reactu

Podobně jako v prohlížečích react Native nepodporuje některé javascriptové rozhraní API používané touto knihovnou sady SDK, takže pro ně musíte poskytnout polyfills. Další podrobnosti najdete v ukázce Messaging React Native s expo.

Ověření klienta

Interakce se službou Event Hubs začíná instancí třídy EventHubConsumerClient nebo instancí třídy EventHubProducerClient. Existují přetížení konstruktoru, které podporují různé způsoby vytvoření instance těchto tříd, jak je znázorněno níže:

Použití připojovacího řetězce pro obor názvů služby Event Hubs

Jedno z přetížení konstruktoru přebírá připojovací řetězec formuláře Endpoint=sb://my-servicebus-namespace.servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key; a název entity k instanci centra událostí. Můžete vytvořit skupinu příjemců a získat připojovací řetězec i název entity z webu portálu Azure Portal.

const { EventHubProducerClient, EventHubConsumerClient } = require("@azure/event-hubs");

const producerClient = new EventHubProducerClient("my-connection-string", "my-event-hub");
const consumerClient = new EventHubConsumerClient(
  "my-consumer-group",
  "my-connection-string",
  "my-event-hub"
);

Použití připojovacího řetězce pro zásady v centru událostí

Další přetížení konstruktoru přebírá připojovací řetězec odpovídající zásadám sdíleného přístupu, které jste definovali přímo v instanci centra událostí (nikoli v oboru názvů Event Hubs). Tento připojovací řetězec bude Endpoint=sb://my-servicebus-namespace.servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-event-hub-nameformuláře . Klíčovým rozdílem ve formátu připojovacího řetězce z předchozího přetížení konstruktoru je ;EntityPath=my-event-hub-name.

const { EventHubProducerClient, EventHubConsumerClient } = require("@azure/event-hubs");

const producerClient = new EventHubProducerClient("my-connection-string-with-entity-path");
const consumerClient = new EventHubConsumerClient(
  "my-consumer-group",
  "my-connection-string-with-entity-path"
);

Použití oboru názvů služby Event Hubs a identity Azure

Toto přetížení konstruktoru přebírá název hostitele a název entity vaší instance centra událostí a pověření, které implementuje tokenCredential rozhraní. To umožňuje ověřování pomocí objektu zabezpečení Azure Active Directory. V balíčku @azure/identity jsou k dispozici implementace rozhraní TokenCredential. Název hostitele je ve formátu <yournamespace>.servicebus.windows.net. Pokud používáte Azure Active Directory, musí mít objekt zabezpečení přiřazenou roli, která umožňuje přístup ke službě Event Hubs, jako je například role Vlastník dat služby Azure Event Hubs. Další informace o používání autorizace Azure Active Directory se službou Event Hubs najdete v přidružené dokumentaci.

const { EventHubProducerClient, EventHubConsumerClient } = require("@azure/event-hubs");

const { DefaultAzureCredential } = require("@azure/identity");
const credential = new DefaultAzureCredential();
const producerClient = new EventHubProducerClient("my-host-name", "my-event-hub", credential);
const consumerClient = new EventHubConsumerClient(
  "my-consumer-group",
  "my-host-name",
  "my-event-hub",
  credential
);

Klíčové koncepty

  • Producent centra událostí je zdrojem telemetrických dat, diagnostických informací, protokolů o využití nebo jiných dat protokolu v rámci řešení vloženého zařízení, aplikace mobilního zařízení, názvu hry běžícího na konzole nebo jiném zařízení, některého obchodního řešení založeného na klientovi nebo serveru nebo webu.

  • Příjemce centra událostí tyto informace vybere z centra událostí a zpracuje je. Zpracování může zahrnovat agregaci, komplexní výpočty a filtrování. Zpracování může zahrnovat také distribuci nebo uložení informací nezpracovaným nebo transformovaným způsobem. Příjemci centra událostí jsou často robustní a vysoce škálovatelné součásti infrastruktury platformy s integrovanými analytickými možnostmi, jako jsou Azure Stream Analytics, Apache Spark nebo Apache Storm.

  • Oddíl je uspořádaná posloupnost událostí, které se uchovávají v centru událostí. Oddíly jsou prostředkem organizace dat přidružené k paralelismu vyžadovaným příjemci událostí. Azure Event Hubs poskytuje streamování zpráv prostřednictvím modelu dělených příjemců, ve kterém každý příjemce čte pouze určitou podmnožinu nebo oddíl streamu zpráv. Při příchodu novějších událostí se přidají na konec této sekvence. Počet oddílů je zadaný v okamžiku vytvoření centra událostí a nelze ho změnit.

  • Skupina uživatelů je zobrazení celého centra událostí. Skupiny uživatelů umožňují, aby každý z nich mohl využívat více aplikací, měl samostatné zobrazení datového proudu událostí a aby stream četl samostatně vlastním tempem a z vlastní pozice. V oddílu na skupinu příjemců může být maximálně 5 souběžných čtenářů; Doporučuje se však, aby pro dané spárování oddílů a skupin příjemců existoval pouze jeden aktivní příjemce. Každý aktivní čtenář obdrží všechny události ze svého oddílu; Pokud je ve stejném oddílu více čtenářů, obdrží duplicitní události.

Další koncepty a hlubší diskuze najdete v tématu: funkce služby Event Hubs

Pokyny k opakováním

EventHubConsumerClient a EventHubProducerClient přijímají options, kde můžete nastavit retryOptions, které vám umožní ladit, jak sada SDK zpracovává přechodné chyby. Mezi příklady přechodných chyb patří dočasné problémy se sítí nebo službami.

Opakování při využívání událostí

Pokud dojde k přechodné chybě (např. k dočasnému problému se sítí), když sada SDK přijímá události, bude se opakovat příjem událostí na základě možností opakování předaných do EventHubConsumerClient. Pokud dojde k vyčerpání maximálního počtu pokusů o opakování, vyvolá se funkce processError.

Pomocí nastavení opakování můžete řídit, jak rychle se dozvíte o dočasných problémech, jako je problém s připojením k síti. Pokud například potřebujete vědět, kdy dojde k problému se sítí, můžete snížit hodnoty pro maxRetries a retryDelayInMs.

Po spuštění funkce processError klient nadále přijímá události z oddílu, pokud byla chyba opakovatelná. Jinak klient vyvolá funkci processClose poskytovanou uživatelem. Tato funkce se také vyvolá, když zastavíte odběr nebo když klient přestane číst události z aktuálního oddílu, protože je vyzvedne jiná instance vaší aplikace jako součást vyrovnávání zatížení.

Funkce processClose poskytuje příležitost aktualizovat kontrolní body v případě potřeby. Po spuštění processCloseklient (nebo v případě vyrovnávání zatížení, klient z jiné instance aplikace) vyvolá funkci poskytovanou uživatelem processInitialize, která obnoví čtení událostí z posledního aktualizovaného kontrolního bodu pro stejný oddíl.

Pokud chcete ukončit pokus o čtení událostí, musíte zavolat close() na subscription vrácenou metodou subscribe.

Příklady

Následující části obsahují fragmenty kódu, které pokrývají některé běžné úlohy pomocí služby Azure Event Hubs.

Kontrola centra událostí

Mnoho operací centra událostí probíhá v rámci rozsahu konkrétního oddílu. Vzhledem k tomu, že oddíly vlastní centrum událostí, jejich názvy se přiřazují při vytváření. Pokud chcete zjistit, jaké oddíly jsou k dispozici, dotazujete centrum událostí pomocí některého ze dvou dostupných klientů: EventHubProducerClient nebo EventHubConsumerClient

V následujícím příkladu používáme EventHubProducerClient.

const { EventHubProducerClient } = require("@azure/event-hubs");

async function main() {
  const client = new EventHubProducerClient("connectionString", "eventHubName");

  const partitionIds = await client.getPartitionIds();

  await client.close();
}

main();

Publikování událostí do centra událostí

Pokud chcete publikovat události, budete muset vytvořit EventHubProducerClient. Následující příklad ukazuje jeden ze způsobů, jak vytvořit klienta, najdete v části Ověření klienta informace o dalších způsobech vytvoření instance klienta.

Události můžete publikovat do konkrétního oddílu nebo povolit službě Event Hubs rozhodnout, do kterého oddílu se mají události publikovat. Doporučujeme použít automatické směrování, když publikování událostí musí být vysoce dostupné nebo kdy se data událostí mají rovnoměrně distribuovat mezi oddíly. V následujícím příkladu využijeme automatické směrování.

  • Vytvoření objektu EventDataBatch pomocí createBatch
  • Přidejte události do dávky pomocí metody tryAdd. Můžete to udělat, dokud nedosáhnete maximálního limitu velikosti dávky nebo dokud nepřidáte počet událostí, které se vám líbí, podle toho, co nastane dříve. Tato metoda by vrátila false, aby bylo možné do dávky přidat žádné další události z důvodu dosažení maximální velikosti dávky.
  • Odešle dávku událostí pomocí metody sendBatch.

V následujícím příkladu se pokusíme do služby Azure Event Hubs odeslat 10 událostí.

const { EventHubProducerClient } = require("@azure/event-hubs");

async function main() {
  const producerClient = new EventHubProducerClient("connectionString", "eventHubName");

  const eventDataBatch = await producerClient.createBatch();
  let numberOfEventsToSend = 10;

  while (numberOfEventsToSend > 0) {
    const wasAdded = eventDataBatch.tryAdd({ body: "my-event-body" });
    if (!wasAdded) {
      break;
    }
    numberOfEventsToSend--;
  }

  await producerClient.sendBatch(eventDataBatch);
  await producerClient.close();
}

main();

Existují možnosti, které můžete předat v různých fázích, abyste mohli řídit proces odesílání událostí do služby Azure Event Hubs.

  • Konstruktor EventHubProducerClient přebírá volitelný parametr typu EventHubClientOptions, který můžete použít k určení možností, jako je počet opakování.
  • Metoda createBatch přebírá volitelný parametr typu CreateBatchOptions, který můžete použít ke speicify maximální velikosti dávky podporované vytvořením dávky.
  • Metoda sendBatch přebírá volitelný parametr typu SendBatchOptions, který můžete použít k určení abortSignal pro zrušení aktuální operace.
  • V případě, že chcete odeslat do konkrétního oddílu, přetížení metody sendBatch umožňuje předat ID oddílu pro odesílání událostí. Výše uvedený příklad Kontrola centra událostí ukazuje, jak načíst id dostupných oddílů.

Poznámka: Při práci se službou Azure Stream Analytics by text události, která se odesílá, měl být také objekt JSON. Příklad: body: { "message": "Hello World" }

Využívání událostí z centra událostí

Pokud chcete využívat události z instance centra událostí, musíte také vědět, na kterou skupinu příjemců chcete cílit. Jakmile to víte, jste připraveni vytvořit EventHubConsumerClient. Následující příklad ukazuje jeden ze způsobů, jak vytvořit klienta, najdete v části Ověření klienta informace o dalších způsobech vytvoření instance klienta.

Metoda subscribe na klientovi má přetížení, které v kombinaci s konstruktorem může vyhovět několika způsobům využití událostí:

Metoda subscribe přebírá volitelný parametr typu SubscriptionOptions, který můžete použít k určení možností, jako je maxBatchSize (počet událostí, na které se má čekat) a maxWaitTimeInSeconds (doba čekání na přijetí událostí maxBatchSize).

Využívání událostí v jednom procesu

Začněte tím, že vytvoříte instanci EventHubConsumerClienta potom zavoláte metodu subscribe(), která začne využívat události.

Metoda subscribe přebírá zpětná volání ke zpracování událostí při jejich přijetí ze služby Azure Event Hubs. Chcete-li zastavit příjem událostí, můžete volat close() na objekt vrácený metodou subscribe().

const { EventHubConsumerClient, earliestEventPosition } = require("@azure/event-hubs");

async function main() {
  const client = new EventHubConsumerClient(
    "my-consumer-group",
    "connectionString",
    "eventHubName"
  );

  // In this sample, we use the position of earliest available event to start from
  // Other common options to configure would be `maxBatchSize` and `maxWaitTimeInSeconds`
  const subscriptionOptions = {
    startPosition: earliestEventPosition
  };

  const subscription = client.subscribe(
    {
      processEvents: async (events, context) => {
        // event processing code goes here
      },
      processError: async (err, context) => {
        // error reporting/handling code here
      }
    },
    subscriptionOptions
  );

  // Wait for a few seconds to receive events before closing
  setTimeout(async () => {
    await subscription.close();
    await client.close();
    console.log(`Exiting sample`);
  }, 3 * 1000);
}

main();

Využívání událostí s vyrovnáváním zatížení napříč několika procesy

Azure Event Hubs dokáže pracovat s miliony událostí za sekundu. Pokud chcete škálovat aplikaci pro zpracování, můžete spustit několik instancí aplikace a vyrovnávat zatížení mezi sebou.

Začněte vytvořením instance EventHubConsumerClient pomocí jednoho z přetížení konstruktoru, který vezme CheckpointStorea potom zavolá subscribe() metoda začít využívat události. Úložiště kontrolních bodů umožní odběratelům ve skupině příjemců koordinovat zpracování mezi několika instancemi vaší aplikace.

V tomto příkladu použijeme BlobCheckpointStore z balíčku @azure/eventhubs-checkpointstore-blob, který implementuje požadované čtení a zápisy do odolného úložiště pomocí služby Azure Blob Storage.

Metoda subscribe přebírá zpětná volání ke zpracování událostí při jejich přijetí ze služby Azure Event Hubs. Chcete-li zastavit příjem událostí, můžete volat close() na objekt vrácený metodou subscribe().

const { EventHubConsumerClient } = require("@azure/event-hubs");
const { ContainerClient } = require("@azure/storage-blob");
const { BlobCheckpointStore } = require("@azure/eventhubs-checkpointstore-blob");

const storageAccountConnectionString = "storage-account-connection-string";
const containerName = "container-name";
const eventHubConnectionString = "eventhub-connection-string";
const consumerGroup = "my-consumer-group";
const eventHubName = "eventHubName";

async function main() {
  const blobContainerClient = new ContainerClient(storageAccountConnectionString, containerName);

  if (!(await blobContainerClient.exists())) {
    await blobContainerClient.create();
  }

  const checkpointStore = new BlobCheckpointStore(blobContainerClient);
  const consumerClient = new EventHubConsumerClient(
    consumerGroup,
    eventHubConnectionString,
    eventHubName,
    checkpointStore
  );

  const subscription = consumerClient.subscribe({
    processEvents: async (events, context) => {
      // event processing code goes here
      if (events.length === 0) {
        // If the wait time expires (configured via options in maxWaitTimeInSeconds) Event Hubs
        // will pass you an empty array.
        return;
      }

      // Checkpointing will allow your service to pick up from
      // where it left off when restarting.
      //
      // You'll want to balance how often you checkpoint with the
      // performance of your underlying checkpoint store.
      await context.updateCheckpoint(events[events.length - 1]);
    },
    processError: async (err, context) => {
      // handle any errors that occur during the course of
      // this subscription
      console.log(`Errors in subscription to partition ${context.partitionId}: ${err}`);
    }
  });

  // Wait for a few seconds to receive events before closing
  await new Promise((resolve) => setTimeout(resolve, 10 * 1000));

  await subscription.close();
  await consumerClient.close();
  console.log(`Exiting sample`);
}

main();

Další informace najdete v tématu Vyrovnávání zatížení oddílů napříč několika instancemi vaší aplikace.

Využívání událostí z jednoho oddílu

Začněte tím, že vytvoříte instanci EventHubConsumerClienta potom zavoláte metodu subscribe(), která začne využívat události. Předejte ID oddílu, na který chcete cílit, do metody subscribe(), aby se spotřebovalo pouze z tohoto oddílu.

V následujícím příkladu používáme první oddíl.

Metoda subscribe přebírá zpětná volání ke zpracování událostí při jejich přijetí ze služby Azure Event Hubs. Chcete-li zastavit příjem událostí, můžete volat close() na objekt vrácený metodou subscribe().

const { EventHubConsumerClient, earliestEventPosition } = require("@azure/event-hubs");

async function main() {
  const client = new EventHubConsumerClient(
    "my-consumer-group",
    "connectionString",
    "eventHubName"
  );
  const partitionIds = await client.getPartitionIds();

  // In this sample, we use the position of earliest available event to start from
  // Other common options to configure would be `maxBatchSize` and `maxWaitTimeInSeconds`
  const subscriptionOptions = {
    startPosition: earliestEventPosition
  };

  const subscription = client.subscribe(
    partitionIds[0],
    {
      processEvents: async (events, context) => {
        // event processing code goes here
      },
      processError: async (err, context) => {
        // error reporting/handling code here
      }
    },
    subscriptionOptions
  );

  // Wait for a few seconds to receive events before closing
  setTimeout(async () => {
    await subscription.close();
    await client.close();
    console.log(`Exiting sample`);
  }, 3 * 1000);
}

main();

Práce s IotHubem pomocí EventHubConsumerClient

K práci s IotHubem můžete použít také EventHubConsumerClient. To je užitečné pro příjem telemetrických dat IotHubu z propojeného EventHubu. Přidružený připojovací řetězec nebude obsahovat deklarace identity, takže odesílání událostí není možné.

  • Všimněte si, že připojovací řetězec musí být pro koncový bod kompatibilní s centrem událostí (např. "Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/; SharedAccessKeyName=my-SA-name; SharedAccessKey=my-SA-key; EntityPath=my-iot-hub-name")
const { EventHubConsumerClient } = require("@azure/event-hubs");

async function main() {
  const client = new EventHubConsumerClient(
    "my-consumer-group",
    "Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-iot-hub-name"
  );
  await client.getEventHubProperties();
  // retrieve partitionIds from client.getEventHubProperties() or client.getPartitionIds()
  const partitionId = "0";
  await client.getPartitionProperties(partitionId);

  await client.close();
}

main();

Řešení problémů

Závislosti AMQP

Knihovna Event Hubs závisí na knihovně rhea-promise ke správě připojení, odesílání a přijímání událostí přes protokol AMQP.

Protokolování

Proměnnou prostředí AZURE_LOG_LEVEL můžete nastavit tak, aby se protokolování povolilo stderr:

export AZURE_LOG_LEVEL=verbose

Podrobnější pokyny k povolení protokolů najdete v dokumentaci k @azure/protokolovacímu balíčku.

Případně můžete nastavit proměnnou prostředí DEBUG pro získání protokolů při použití této knihovny. To může být užitečné, pokud chcete také generovat protokoly ze závislostí rhea-promise a rhea.

Poznámka: AZURE_LOG_LEVEL, pokud je nastavena, má přednost před laděním. Při zadávání AZURE_LOG_LEVEL nebo volání setLogLevel nezadávejte žádné knihovny azure prostřednictvím ladění.

  • Získání protokolů ladění na úrovni informací ze sady Event Hubs SDK
export DEBUG=azure:*:info
  • Získání protokolů ladění ze sady Event Hubs SDK a knihovny na úrovni protokolu
export DEBUG=azure*,rhea*
  • Pokud nechcete zobrazovat nezpracovaná data událostí (což spotřebovává velké množství místa na konzole nebo disku), můžete proměnnou prostředí DEBUG nastavit následujícím způsobem:
export DEBUG=azure*,rhea*,-rhea:raw,-rhea:message
  • Pokud vás zajímají jenom chyby a upozornění sady SDK , můžete proměnnou prostředí DEBUG nastavit následujícím způsobem:
export DEBUG=azure:*:(error|warning),rhea-promise:error,rhea:events,rhea:frames,rhea:io,rhea:flow

Další kroky

Další ukázkový kód

Podrobné příklady použití této knihovny k odesílání a přijímání událostí do a z služby Event Hubsnajdete vukázkách .

Přispívající

Pokud chcete přispívat do této knihovny, přečtěte si průvodce přispívání a přečtěte si další informace o vytváření a testování kódu.

imprese