Klientská knihovna služby Azure Event Hubs pro JavaScript – verze 5.12.1
Azure Event Hubs je vysoce škálovatelná služba publikování a odběru, která dokáže ingestovat miliony událostí za sekundu a streamovat je více příjemcům. Díky tomu můžete zpracovávat a analyzovat obrovské objemy dat vytvořených připojenými zařízeními a aplikacemi. Pokud se chcete dozvědět více o službě Azure Event Hubs, můžete si projít: Co je služba Event Hubs?
Klientská knihovna Azure Event Hubs umožňuje odesílat a přijímat události ve vaší Node.js aplikaci.
Klíčové odkazy:
POZNÁMKA: Pokud používáte verzi 2.1.0 nebo nižší a chcete migrovat na nejnovější verzi tohoto balíčku, projděte si našeho průvodce migrací a přejděte z EventHubs V2 na EventHubs V5.
Ukázky pro v2 a dokumentaci jsou stále k dispozici tady:
zdrojový kód pro balíček | v2.1.0 (npm) | pro verze 2.1.0
Začínáme
Instalace balíčku
Instalace klientské knihovny Azure Event Hubs pomocí npm
npm install @azure/event-hubs
Aktuálně podporovaná prostředí
- LTS verze Node.js
- Nejnovější verze Safari, Chrome, Edge a Firefox.
Další podrobnosti najdete v našich zásadách podpory .
Požadavky
- Předplatné Azure
- oboru názvů služby Event Hubs
Konfigurace TypeScriptu
Uživatelé TypeScriptu musí mít nainstalované definice typu Node:
npm install @types/node
Musíte také povolit compilerOptions.allowSyntheticDefaultImports
ve svém tsconfig.json. Všimněte si, že pokud jste povolili compilerOptions.esModuleInterop
, allowSyntheticDefaultImports
je ve výchozím nastavení povolená. Další informace najdete v příručce možnosti kompilátoru TypeScriptu.
JavaScript Bundle
Pokud chcete tuto klientskou knihovnu použít v prohlížeči, musíte nejprve použít bundler. Podrobnosti o tom, jak to udělat, najdete v naší dokumentaci sdružování.
Kromě toho, co je zde popsáno, tato knihovna také potřebuje další polyfilly pro následující integrované moduly NodeJS, aby fungovala správně v prohlížečích:
buffer
os
path
process
Sdružování s Webpackem
Pokud používáte Webpack v5, můžete nainstalovat následující vývojové závislosti.
npm install --save-dev os-browserify path-browserify
pak do svého webpack.config.js přidejte následující:
const path = require("path");
+const webpack = require("webpack");
module.exports = {
entry: "./src/index.ts",
@@ -12,8 +13,21 @@ module.exports = {
},
],
},
+ plugins: [
+ new webpack.ProvidePlugin({
+ process: "process/browser",
+ }),
+ new webpack.ProvidePlugin({
+ Buffer: ["buffer", "Buffer"],
+ }),
+ ],
resolve: {
extensions: [".ts", ".js"],
+ fallback: {
+ buffer: require.resolve("buffer/"),
+ os: require.resolve("os-browserify"),
+ path: require.resolve("path-browserify"),
+ },
},
Sdružování s kumulativními funkcemi
Pokud používáte rollup bundler, nainstalujte následující vývojové závislosti.
npm install --save-dev @rollup/plugin-commonjs @rollup/plugin-inject @rollup/plugin-node-resolve
Pak do svého rollup.config.js zahrňte následující:
+import nodeResolve from "@rollup/plugin-node-resolve";
+import cjs from "@rollup/plugin-commonjs";
+import shim from "rollup-plugin-shim";
+import inject from "@rollup/plugin-inject";
export default {
// other configs
plugins: [
+ shim({
+ fs: `export default {}`,
+ net: `export default {}`,
+ tls: `export default {}`,
+ path: `export default {}`,
+ dns: `export function resolve() { }`,
+ }),
+ nodeResolve({
+ mainFields: ["module", "browser"],
+ preferBuiltins: false,
+ }),
+ cjs(),
+ inject({
+ modules: {
+ Buffer: ["buffer", "Buffer"],
+ process: "process",
+ },
+ exclude: ["./**/package.json"],
+ }),
]
};
Další informace o používání polyfills najdete v dokumentaci k vašemu oblíbenému bundleru.
Nativní podpora Reactu
Podobně jako v prohlížečích react Native nepodporuje některé javascriptové rozhraní API používané touto knihovnou sady SDK, takže pro ně musíte poskytnout polyfills. Další podrobnosti najdete v ukázce Messaging React Native s expo.
Ověření klienta
Interakce se službou Event Hubs začíná instancí třídy EventHubConsumerClient nebo instancí třídy EventHubProducerClient. Existují přetížení konstruktoru, které podporují různé způsoby vytvoření instance těchto tříd, jak je znázorněno níže:
Použití připojovacího řetězce pro obor názvů služby Event Hubs
Jedno z přetížení konstruktoru přebírá připojovací řetězec formuláře Endpoint=sb://my-servicebus-namespace.servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;
a název entity k instanci centra událostí. Můžete vytvořit skupinu příjemců a získat připojovací řetězec i název entity z webu portálu Azure Portal.
const { EventHubProducerClient, EventHubConsumerClient } = require("@azure/event-hubs");
const producerClient = new EventHubProducerClient("my-connection-string", "my-event-hub");
const consumerClient = new EventHubConsumerClient(
"my-consumer-group",
"my-connection-string",
"my-event-hub"
);
Použití připojovacího řetězce pro zásady v centru událostí
Další přetížení konstruktoru přebírá připojovací řetězec odpovídající zásadám sdíleného přístupu, které jste definovali přímo v instanci centra událostí (nikoli v oboru názvů Event Hubs).
Tento připojovací řetězec bude Endpoint=sb://my-servicebus-namespace.servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-event-hub-name
formuláře .
Klíčovým rozdílem ve formátu připojovacího řetězce z předchozího přetížení konstruktoru je ;EntityPath=my-event-hub-name
.
const { EventHubProducerClient, EventHubConsumerClient } = require("@azure/event-hubs");
const producerClient = new EventHubProducerClient("my-connection-string-with-entity-path");
const consumerClient = new EventHubConsumerClient(
"my-consumer-group",
"my-connection-string-with-entity-path"
);
Použití oboru názvů služby Event Hubs a identity Azure
Toto přetížení konstruktoru přebírá název hostitele a název entity vaší instance centra událostí a pověření, které implementuje tokenCredential rozhraní. To umožňuje ověřování pomocí objektu zabezpečení Azure Active Directory. V balíčku @azure/identity jsou k dispozici implementace rozhraní TokenCredential
. Název hostitele je ve formátu <yournamespace>.servicebus.windows.net
. Pokud používáte Azure Active Directory, musí mít objekt zabezpečení přiřazenou roli, která umožňuje přístup ke službě Event Hubs, jako je například role Vlastník dat služby Azure Event Hubs. Další informace o používání autorizace Azure Active Directory se službou Event Hubs najdete v přidružené dokumentaci.
const { EventHubProducerClient, EventHubConsumerClient } = require("@azure/event-hubs");
const { DefaultAzureCredential } = require("@azure/identity");
const credential = new DefaultAzureCredential();
const producerClient = new EventHubProducerClient("my-host-name", "my-event-hub", credential);
const consumerClient = new EventHubConsumerClient(
"my-consumer-group",
"my-host-name",
"my-event-hub",
credential
);
Klíčové koncepty
Producent centra událostí je zdrojem telemetrických dat, diagnostických informací, protokolů o využití nebo jiných dat protokolu v rámci řešení vloženého zařízení, aplikace mobilního zařízení, názvu hry běžícího na konzole nebo jiném zařízení, některého obchodního řešení založeného na klientovi nebo serveru nebo webu.
Příjemce centra událostí tyto informace vybere z centra událostí a zpracuje je. Zpracování může zahrnovat agregaci, komplexní výpočty a filtrování. Zpracování může zahrnovat také distribuci nebo uložení informací nezpracovaným nebo transformovaným způsobem. Příjemci centra událostí jsou často robustní a vysoce škálovatelné součásti infrastruktury platformy s integrovanými analytickými možnostmi, jako jsou Azure Stream Analytics, Apache Spark nebo Apache Storm.
Oddíl je uspořádaná posloupnost událostí, které se uchovávají v centru událostí. Oddíly jsou prostředkem organizace dat přidružené k paralelismu vyžadovaným příjemci událostí. Azure Event Hubs poskytuje streamování zpráv prostřednictvím modelu dělených příjemců, ve kterém každý příjemce čte pouze určitou podmnožinu nebo oddíl streamu zpráv. Při příchodu novějších událostí se přidají na konec této sekvence. Počet oddílů je zadaný v okamžiku vytvoření centra událostí a nelze ho změnit.
Skupina uživatelů je zobrazení celého centra událostí. Skupiny uživatelů umožňují, aby každý z nich mohl využívat více aplikací, měl samostatné zobrazení datového proudu událostí a aby stream četl samostatně vlastním tempem a z vlastní pozice. V oddílu na skupinu příjemců může být maximálně 5 souběžných čtenářů; Doporučuje se však, aby pro dané spárování oddílů a skupin příjemců existoval pouze jeden aktivní příjemce. Každý aktivní čtenář obdrží všechny události ze svého oddílu; Pokud je ve stejném oddílu více čtenářů, obdrží duplicitní události.
Další koncepty a hlubší diskuze najdete v tématu: funkce služby Event Hubs
Pokyny k opakováním
EventHubConsumerClient
a EventHubProducerClient
přijímají options
, kde můžete nastavit retryOptions
, které vám umožní ladit, jak sada SDK zpracovává přechodné chyby.
Mezi příklady přechodných chyb patří dočasné problémy se sítí nebo službami.
Opakování při využívání událostí
Pokud dojde k přechodné chybě (např. k dočasnému problému se sítí), když sada SDK přijímá události, bude se opakovat příjem událostí na základě možností opakování předaných do EventHubConsumerClient
.
Pokud dojde k vyčerpání maximálního počtu pokusů o opakování, vyvolá se funkce processError
.
Pomocí nastavení opakování můžete řídit, jak rychle se dozvíte o dočasných problémech, jako je problém s připojením k síti.
Pokud například potřebujete vědět, kdy dojde k problému se sítí, můžete snížit hodnoty pro maxRetries
a retryDelayInMs
.
Po spuštění funkce processError
klient nadále přijímá události z oddílu, pokud byla chyba opakovatelná. Jinak klient vyvolá funkci processClose
poskytovanou uživatelem.
Tato funkce se také vyvolá, když zastavíte odběr nebo když klient přestane číst události z aktuálního oddílu, protože je vyzvedne jiná instance vaší aplikace jako součást vyrovnávání zatížení.
Funkce processClose
poskytuje příležitost aktualizovat kontrolní body v případě potřeby.
Po spuštění processClose
klient (nebo v případě vyrovnávání zatížení, klient z jiné instance aplikace) vyvolá funkci poskytovanou uživatelem processInitialize
, která obnoví čtení událostí z posledního aktualizovaného kontrolního bodu pro stejný oddíl.
Pokud chcete ukončit pokus o čtení událostí, musíte zavolat close()
na subscription
vrácenou metodou subscribe
.
Příklady
Následující části obsahují fragmenty kódu, které pokrývají některé běžné úlohy pomocí služby Azure Event Hubs.
- Kontrola centra událostí
- publikování událostí do centra událostí
- využívat události z centra událostí
- práce s IotHubu pomocí EventHubConsumerClient
Kontrola centra událostí
Mnoho operací centra událostí probíhá v rámci rozsahu konkrétního oddílu.
Vzhledem k tomu, že oddíly vlastní centrum událostí, jejich názvy se přiřazují při vytváření.
Pokud chcete zjistit, jaké oddíly jsou k dispozici, dotazujete centrum událostí pomocí některého ze dvou dostupných klientů: EventHubProducerClient
nebo EventHubConsumerClient
V následujícím příkladu používáme EventHubProducerClient
.
const { EventHubProducerClient } = require("@azure/event-hubs");
async function main() {
const client = new EventHubProducerClient("connectionString", "eventHubName");
const partitionIds = await client.getPartitionIds();
await client.close();
}
main();
Publikování událostí do centra událostí
Pokud chcete publikovat události, budete muset vytvořit EventHubProducerClient
. Následující příklad ukazuje jeden ze způsobů, jak vytvořit klienta, najdete v části Ověření klienta informace o dalších způsobech vytvoření instance klienta.
Události můžete publikovat do konkrétního oddílu nebo povolit službě Event Hubs rozhodnout, do kterého oddílu se mají události publikovat. Doporučujeme použít automatické směrování, když publikování událostí musí být vysoce dostupné nebo kdy se data událostí mají rovnoměrně distribuovat mezi oddíly. V následujícím příkladu využijeme automatické směrování.
- Vytvoření objektu
EventDataBatch
pomocí createBatch - Přidejte události do dávky pomocí metody tryAdd. Můžete to udělat, dokud nedosáhnete maximálního limitu velikosti dávky nebo dokud nepřidáte počet událostí, které se vám líbí, podle toho, co nastane dříve. Tato metoda by vrátila
false
, aby bylo možné do dávky přidat žádné další události z důvodu dosažení maximální velikosti dávky. - Odešle dávku událostí pomocí metody sendBatch.
V následujícím příkladu se pokusíme do služby Azure Event Hubs odeslat 10 událostí.
const { EventHubProducerClient } = require("@azure/event-hubs");
async function main() {
const producerClient = new EventHubProducerClient("connectionString", "eventHubName");
const eventDataBatch = await producerClient.createBatch();
let numberOfEventsToSend = 10;
while (numberOfEventsToSend > 0) {
const wasAdded = eventDataBatch.tryAdd({ body: "my-event-body" });
if (!wasAdded) {
break;
}
numberOfEventsToSend--;
}
await producerClient.sendBatch(eventDataBatch);
await producerClient.close();
}
main();
Existují možnosti, které můžete předat v různých fázích, abyste mohli řídit proces odesílání událostí do služby Azure Event Hubs.
- Konstruktor
EventHubProducerClient
přebírá volitelný parametr typuEventHubClientOptions
, který můžete použít k určení možností, jako je počet opakování. - Metoda
createBatch
přebírá volitelný parametr typuCreateBatchOptions
, který můžete použít ke speicify maximální velikosti dávky podporované vytvořením dávky. - Metoda
sendBatch
přebírá volitelný parametr typuSendBatchOptions
, který můžete použít k určeníabortSignal
pro zrušení aktuální operace. - V případě, že chcete odeslat do konkrétního oddílu, přetížení metody
sendBatch
umožňuje předat ID oddílu pro odesílání událostí. Výše uvedený příklad Kontrola centra událostí ukazuje, jak načíst id dostupných oddílů.
Poznámka: Při práci se službou Azure Stream Analytics by text události, která se odesílá, měl být také objekt JSON.
Příklad: body: { "message": "Hello World" }
Využívání událostí z centra událostí
Pokud chcete využívat události z instance centra událostí, musíte také vědět, na kterou skupinu příjemců chcete cílit. Jakmile to víte, jste připraveni vytvořit EventHubConsumerClient. Následující příklad ukazuje jeden ze způsobů, jak vytvořit klienta, najdete v části Ověření klienta informace o dalších způsobech vytvoření instance klienta.
Metoda subscribe
na klientovi má přetížení, které v kombinaci s konstruktorem může vyhovět několika způsobům využití událostí:
- Spotřebovávat události v jednom procesu
- využití událostí s vyrovnáváním zatížení napříč několika procesy
- Využití událostí z jednoho oddílu
Metoda subscribe
přebírá volitelný parametr typu SubscriptionOptions
, který můžete použít k určení možností, jako je maxBatchSize (počet událostí, na které se má čekat) a maxWaitTimeInSeconds (doba čekání na přijetí událostí maxBatchSize).
Využívání událostí v jednom procesu
Začněte tím, že vytvoříte instanci EventHubConsumerClient
a potom zavoláte metodu subscribe()
, která začne využívat události.
Metoda subscribe
přebírá zpětná volání ke zpracování událostí při jejich přijetí ze služby Azure Event Hubs.
Chcete-li zastavit příjem událostí, můžete volat close()
na objekt vrácený metodou subscribe()
.
const { EventHubConsumerClient, earliestEventPosition } = require("@azure/event-hubs");
async function main() {
const client = new EventHubConsumerClient(
"my-consumer-group",
"connectionString",
"eventHubName"
);
// In this sample, we use the position of earliest available event to start from
// Other common options to configure would be `maxBatchSize` and `maxWaitTimeInSeconds`
const subscriptionOptions = {
startPosition: earliestEventPosition
};
const subscription = client.subscribe(
{
processEvents: async (events, context) => {
// event processing code goes here
},
processError: async (err, context) => {
// error reporting/handling code here
}
},
subscriptionOptions
);
// Wait for a few seconds to receive events before closing
setTimeout(async () => {
await subscription.close();
await client.close();
console.log(`Exiting sample`);
}, 3 * 1000);
}
main();
Využívání událostí s vyrovnáváním zatížení napříč několika procesy
Azure Event Hubs dokáže pracovat s miliony událostí za sekundu. Pokud chcete škálovat aplikaci pro zpracování, můžete spustit několik instancí aplikace a vyrovnávat zatížení mezi sebou.
Začněte vytvořením instance EventHubConsumerClient
pomocí jednoho z přetížení konstruktoru, který vezme CheckpointStore
a potom zavolá subscribe()
metoda začít využívat události. Úložiště kontrolních bodů umožní odběratelům ve skupině příjemců koordinovat zpracování mezi několika instancemi vaší aplikace.
V tomto příkladu použijeme BlobCheckpointStore
z balíčku @azure/eventhubs-checkpointstore-blob
, který implementuje požadované čtení a zápisy do odolného úložiště pomocí služby Azure Blob Storage.
Metoda subscribe
přebírá zpětná volání ke zpracování událostí při jejich přijetí ze služby Azure Event Hubs.
Chcete-li zastavit příjem událostí, můžete volat close()
na objekt vrácený metodou subscribe()
.
const { EventHubConsumerClient } = require("@azure/event-hubs");
const { ContainerClient } = require("@azure/storage-blob");
const { BlobCheckpointStore } = require("@azure/eventhubs-checkpointstore-blob");
const storageAccountConnectionString = "storage-account-connection-string";
const containerName = "container-name";
const eventHubConnectionString = "eventhub-connection-string";
const consumerGroup = "my-consumer-group";
const eventHubName = "eventHubName";
async function main() {
const blobContainerClient = new ContainerClient(storageAccountConnectionString, containerName);
if (!(await blobContainerClient.exists())) {
await blobContainerClient.create();
}
const checkpointStore = new BlobCheckpointStore(blobContainerClient);
const consumerClient = new EventHubConsumerClient(
consumerGroup,
eventHubConnectionString,
eventHubName,
checkpointStore
);
const subscription = consumerClient.subscribe({
processEvents: async (events, context) => {
// event processing code goes here
if (events.length === 0) {
// If the wait time expires (configured via options in maxWaitTimeInSeconds) Event Hubs
// will pass you an empty array.
return;
}
// Checkpointing will allow your service to pick up from
// where it left off when restarting.
//
// You'll want to balance how often you checkpoint with the
// performance of your underlying checkpoint store.
await context.updateCheckpoint(events[events.length - 1]);
},
processError: async (err, context) => {
// handle any errors that occur during the course of
// this subscription
console.log(`Errors in subscription to partition ${context.partitionId}: ${err}`);
}
});
// Wait for a few seconds to receive events before closing
await new Promise((resolve) => setTimeout(resolve, 10 * 1000));
await subscription.close();
await consumerClient.close();
console.log(`Exiting sample`);
}
main();
Další informace najdete v tématu Vyrovnávání zatížení oddílů napříč několika instancemi vaší aplikace.
Využívání událostí z jednoho oddílu
Začněte tím, že vytvoříte instanci EventHubConsumerClient
a potom zavoláte metodu subscribe()
, která začne využívat události. Předejte ID oddílu, na který chcete cílit, do metody subscribe()
, aby se spotřebovalo pouze z tohoto oddílu.
V následujícím příkladu používáme první oddíl.
Metoda subscribe
přebírá zpětná volání ke zpracování událostí při jejich přijetí ze služby Azure Event Hubs.
Chcete-li zastavit příjem událostí, můžete volat close()
na objekt vrácený metodou subscribe()
.
const { EventHubConsumerClient, earliestEventPosition } = require("@azure/event-hubs");
async function main() {
const client = new EventHubConsumerClient(
"my-consumer-group",
"connectionString",
"eventHubName"
);
const partitionIds = await client.getPartitionIds();
// In this sample, we use the position of earliest available event to start from
// Other common options to configure would be `maxBatchSize` and `maxWaitTimeInSeconds`
const subscriptionOptions = {
startPosition: earliestEventPosition
};
const subscription = client.subscribe(
partitionIds[0],
{
processEvents: async (events, context) => {
// event processing code goes here
},
processError: async (err, context) => {
// error reporting/handling code here
}
},
subscriptionOptions
);
// Wait for a few seconds to receive events before closing
setTimeout(async () => {
await subscription.close();
await client.close();
console.log(`Exiting sample`);
}, 3 * 1000);
}
main();
Práce s IotHubem pomocí EventHubConsumerClient
K práci s IotHubem můžete použít také EventHubConsumerClient
. To je užitečné pro příjem telemetrických dat IotHubu z propojeného EventHubu.
Přidružený připojovací řetězec nebude obsahovat deklarace identity, takže odesílání událostí není možné.
- Všimněte si, že připojovací řetězec musí být pro koncový bod kompatibilní s centrem událostí (např. "Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/; SharedAccessKeyName=my-SA-name; SharedAccessKey=my-SA-key; EntityPath=my-iot-hub-name")
const { EventHubConsumerClient } = require("@azure/event-hubs");
async function main() {
const client = new EventHubConsumerClient(
"my-consumer-group",
"Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-iot-hub-name"
);
await client.getEventHubProperties();
// retrieve partitionIds from client.getEventHubProperties() or client.getPartitionIds()
const partitionId = "0";
await client.getPartitionProperties(partitionId);
await client.close();
}
main();
Řešení problémů
Závislosti AMQP
Knihovna Event Hubs závisí na knihovně rhea-promise ke správě připojení, odesílání a přijímání událostí přes protokol AMQP.
Protokolování
Proměnnou prostředí AZURE_LOG_LEVEL
můžete nastavit tak, aby se protokolování povolilo stderr
:
export AZURE_LOG_LEVEL=verbose
Podrobnější pokyny k povolení protokolů najdete v dokumentaci k @azure/protokolovacímu balíčku.
Případně můžete nastavit proměnnou prostředí DEBUG
pro získání protokolů při použití této knihovny.
To může být užitečné, pokud chcete také generovat protokoly ze závislostí rhea-promise
a rhea
.
Poznámka: AZURE_LOG_LEVEL, pokud je nastavena, má přednost před laděním.
Při zadávání AZURE_LOG_LEVEL nebo volání setLogLevel nezadávejte žádné knihovny azure
prostřednictvím ladění.
- Získání protokolů ladění na úrovni informací ze sady Event Hubs SDK
export DEBUG=azure:*:info
- Získání protokolů ladění ze sady Event Hubs SDK a knihovny na úrovni protokolu
export DEBUG=azure*,rhea*
- Pokud nechcete zobrazovat nezpracovaná data událostí (což spotřebovává velké množství místa na konzole nebo disku), můžete proměnnou prostředí
DEBUG
nastavit následujícím způsobem:
export DEBUG=azure*,rhea*,-rhea:raw,-rhea:message
- Pokud vás zajímají jenom chyby a upozornění sady SDK , můžete proměnnou prostředí
DEBUG
nastavit následujícím způsobem:
export DEBUG=azure:*:(error|warning),rhea-promise:error,rhea:events,rhea:frames,rhea:io,rhea:flow
Další kroky
Další ukázkový kód
Podrobné příklady použití této knihovny k odesílání a přijímání událostí do a z
Přispívající
Pokud chcete přispívat do této knihovny, přečtěte si průvodce přispívání a přečtěte si další informace o vytváření a testování kódu.
Azure SDK for JavaScript