Azure Event Hubs klientbibliotek för JavaScript – version 5.12.0
Azure Event Hubs är en mycket skalbar publicera-prenumerera-tjänst som kan mata in miljontals händelser per sekund och strömma dem till flera konsumenter. På så sätt kan du bearbeta och analysera de enorma mängder data som produceras av dina anslutna enheter och program. Om du vill veta mer om Azure Event Hubs kanske du vill granska: Vad är Event Hubs?
Med Azure Event Hubs klientbibliotek kan du skicka och ta emot händelser i ditt Node.js program.
Nyckellänkar:
Obs! Om du använder version 2.1.0 eller senare och vill migrera till den senaste versionen av det här paketet kan du läsa vår migreringsguide för att gå från EventHubs V2 till EventHubs V5
Exempel för v2 och dokumentation finns fortfarande här:
Källkod för v2.1.0 | Paket för v2.1.0 (npm) | Exempel för v2.1.0
Komma igång
Installera paketet
Installera Azure Event Hubs-klientbiblioteket med npm
npm install @azure/event-hubs
Miljöer som stöds för närvarande
- LTS-versioner av Node.js
- De senaste versionerna av Safari, Chrome, Edge och Firefox.
Mer information finns i vår supportpolicy .
Förutsättningar
Konfigurera TypeScript
TypeScript-användare måste ha nodtypsdefinitioner installerade:
npm install @types/node
Du måste också aktivera compilerOptions.allowSyntheticDefaultImports
i din tsconfig.json. Observera att om du har aktiverat compilerOptions.esModuleInterop
allowSyntheticDefaultImports
är aktiverad som standard. Mer information finns i Handbook för kompilatoralternativ i TypeScript .
JavaScript-paket
Om du vill använda det här klientbiblioteket i webbläsaren måste du först använda en bundler. Mer information om hur du gör detta finns i vår paketeringsdokumentation.
Förutom det som beskrivs där behöver det här biblioteket även ytterligare polyfiller för följande inbyggda NodeJS-kärnmoduler för att fungera korrekt i webbläsarna:
buffer
os
path
process
Paketering med Webpack
Om du använder Webpack v5 kan du installera följande dev-beroenden
npm install --save-dev os-browserify path-browserify
lägg sedan till följande i webpack.config.js
const path = require("path");
+const webpack = require("webpack");
module.exports = {
entry: "./src/index.ts",
@@ -12,8 +13,21 @@ module.exports = {
},
],
},
+ plugins: [
+ new webpack.ProvidePlugin({
+ process: "process/browser",
+ }),
+ new webpack.ProvidePlugin({
+ Buffer: ["buffer", "Buffer"],
+ }),
+ ],
resolve: {
extensions: [".ts", ".js"],
+ fallback: {
+ buffer: require.resolve("buffer/"),
+ os: require.resolve("os-browserify"),
+ path: require.resolve("path-browserify"),
+ },
},
Paketering med sammanslagning
Om du använder Sammanslagningspaket installerar du följande dev-beroenden
npm install --save-dev @rollup/plugin-commonjs @rollup/plugin-inject @rollup/plugin-node-resolve
Ta sedan med följande i rollup.config.js
+import nodeResolve from "@rollup/plugin-node-resolve";
+import cjs from "@rollup/plugin-commonjs";
+import shim from "rollup-plugin-shim";
+import inject from "@rollup/plugin-inject";
export default {
// other configs
plugins: [
+ shim({
+ fs: `export default {}`,
+ net: `export default {}`,
+ tls: `export default {}`,
+ path: `export default {}`,
+ dns: `export function resolve() { }`,
+ }),
+ nodeResolve({
+ mainFields: ["module", "browser"],
+ preferBuiltins: false,
+ }),
+ cjs(),
+ inject({
+ modules: {
+ Buffer: ["buffer", "Buffer"],
+ process: "process",
+ },
+ exclude: ["./**/package.json"],
+ }),
]
};
Mer information om hur du använder polyfiller finns i dokumentationen för ditt favoritpaket.
React Native support
I likhet med webbläsare stöder React Native inte vissa JavaScript-API:er som används av det här SDK-biblioteket, så du måste ange polyfiller för dem. Mer information finns i exemplet React Native meddelanden med Expo.
Autentisera klienten
Interaktionen med Event Hubs börjar med antingen en instans av klassen EventHubConsumerClient eller en instans av klassen EventHubProducerClient . Det finns konstruktoröverlagringar som stöder olika sätt att instansiera dessa klasser enligt nedan:
Använda anslutningssträng för Event Hubs-namnområdet
En av konstruktorns överlagringar tar en anslutningssträng av formuläret Endpoint=sb://my-servicebus-namespace.servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;
och entitetsnamnet till din Event Hub-instans. Du kan skapa en konsumentgrupp och hämta anslutningssträng samt entitetsnamnet från Azure Portal.
const { EventHubProducerClient, EventHubConsumerClient } = require("@azure/event-hubs");
const producerClient = new EventHubProducerClient("my-connection-string", "my-event-hub");
const consumerClient = new EventHubConsumerClient(
"my-consumer-group",
"my-connection-string",
"my-event-hub"
);
Använda anslutningssträng för principer på händelsehubben
En annan konstruktoröverbelastning tar anslutningssträng som motsvarar principen för delad åtkomst som du har definierat direkt på Event Hub-instansen (och inte Event Hubs-namnområdet).
Den här anslutningssträng kommer att ha formatet Endpoint=sb://my-servicebus-namespace.servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-event-hub-name
.
Den viktigaste skillnaden i anslutningssträng format från den tidigare konstruktorns överlagring är ;EntityPath=my-event-hub-name
.
const { EventHubProducerClient, EventHubConsumerClient } = require("@azure/event-hubs");
const producerClient = new EventHubProducerClient("my-connection-string-with-entity-path");
const consumerClient = new EventHubConsumerClient(
"my-consumer-group",
"my-connection-string-with-entity-path"
);
Använda Event Hubs-namnområdet och Azure Identity
Den här konstruktoröverbelastningen tar värdnamnet och entitetsnamnet för din Event Hub-instans och autentiseringsuppgifter som implementerar TokenCredential-gränssnittet. På så sätt kan du autentisera med hjälp av ett Azure Active Directory-huvudnamn. Det finns implementeringar av TokenCredential
gränssnittet i @azure/identitetspaketet . Värdnamnet har formatet <yournamespace>.servicebus.windows.net
. När du använder Azure Active Directory måste huvudkontot tilldelas en roll som ger åtkomst till Event Hubs, till exempel rollen Azure Event Hubs dataägare. Mer information om hur du använder Azure Active Directory-auktorisering med Event Hubs finns i den associerade dokumentationen.
const { EventHubProducerClient, EventHubConsumerClient } = require("@azure/event-hubs");
const { DefaultAzureCredential } = require("@azure/identity");
const credential = new DefaultAzureCredential();
const producerClient = new EventHubProducerClient("my-host-name", "my-event-hub", credential);
const consumerClient = new EventHubConsumerClient(
"my-consumer-group",
"my-host-name",
"my-event-hub",
credential
);
Viktiga begrepp
En händelsehubbproducent är en källa till telemetridata, diagnostikinformation, användningsloggar eller andra loggdata, som en del av en inbäddad enhetslösning, ett mobilt enhetsprogram, en speltitel som körs på en konsol eller annan enhet, en klient- eller serverbaserad affärslösning eller en webbplats.
En händelsehubbkonsument hämtar sådan information från händelsehubben och bearbetar den. Bearbetningen kan omfatta aggregering, komplex beräkning och filtrering. Bearbetningen kan också omfatta distribution eller lagring av informationen på ett obearbetat eller transformerat sätt. Event Hub-konsumenter är ofta robusta och storskaliga plattformsinfrastrukturdelar med inbyggda analysfunktioner som Azure Stream Analytics, Apache Spark eller Apache Storm.
En partition är en ordnad sekvens av händelser som lagras i en händelsehubb. Partitioner är ett sätt att organisera data som är associerade med den parallellitet som krävs av händelsekonsumenter. Azure Event Hubs tillhandahåller meddelandeströmning via ett partitionerat konsumentmönster där varje konsument endast läser en viss delmängd, eller partition, av meddelandeströmmen. När nya händelser anländer läggs de till i slutet av denna sekvens. Antalet partitioner anges när en händelsehubb skapas och kan inte ändras.
En konsumentgrupp är en vy över en hel händelsehubb. Konsumentgrupper gör det möjligt för flera konsumerande program att ha en separat vy över händelseströmmen och att läsa dataströmmen oberoende av varandra i sin egen takt och från sin egen position. Det kan finnas högst 5 samtidiga läsare på en partition per konsumentgrupp. Vi rekommenderar dock att det bara finns en aktiv konsument för en viss partition och parkoppling av konsumentgrupper. Varje aktiv läsare tar emot alla händelser från partitionen. Om det finns flera läsare på samma partition får de dubbletthändelser.
Fler begrepp och djupare diskussion finns i: Event Hubs-funktioner
Vägledning kring återförsök
Och EventHubConsumerClient
EventHubProducerClient
acceptera options
var du kan ange retryOptions
som gör att du kan finjustera hur SDK hanterar tillfälliga fel.
Exempel på tillfälliga fel är tillfälliga nätverk eller tjänstproblem.
Återförsök vid användning av händelser
Om ett tillfälligt fel (t.ex. ett tillfälligt nätverksproblem) påträffas när SDK:n tar emot händelser kommer det att försöka ta emot händelser igen baserat på de återförsöksalternativ som skickas till EventHubConsumerClient
.
Om det maximala antalet återförsök är slut processError
anropas funktionen.
Du kan använda återförsöksinställningarna för att styra hur snabbt du informeras om tillfälliga problem, till exempel ett problem med nätverksanslutningen.
Om du till exempel behöver veta när det finns ett nätverksproblem direkt kan du sänka värdena för maxRetries
och retryDelayInMs
.
När funktionen har körts processError
fortsätter klienten att ta emot händelser från partitionen så länge felet var ett återförsök. Annars anropar klienten funktionen som tillhandahålls av processClose
användaren.
Den här funktionen anropas också när du stoppar prenumerationen eller när klienten slutar läsa händelser från den aktuella partitionen på grund av att den hämtas av en annan instans av ditt program som en del av belastningsutjämningen.
Funktionen processClose
ger möjlighet att uppdatera kontrollpunkter om det behövs.
När du har kört processClose
anropar klienten (eller i händelse av belastningsutjämning, en klient från en annan instans av ditt program) funktionen som tillhandahålls processInitialize
av användaren för att återuppta läsningen av händelser från den senast uppdaterade kontrollpunkten för samma partition.
Om du vill sluta försöka läsa händelser måste du anropa close()
på den subscription
som returneras av subscribe
metoden .
Exempel
Följande avsnitt innehåller kodfragment som täcker några av de vanliga uppgifterna med hjälp av Azure Event Hubs
- Inspektera en händelsehubb
- Publicera händelser till en händelsehubb
- Använda händelser från en händelsehubb
- Använda EventHubConsumerClient för att arbeta med IotHub
Inspektera en händelsehubb
Många event hub-åtgärder utförs inom omfånget för en specifik partition.
Eftersom partitioner ägs av händelsehubben tilldelas deras namn när de skapas.
För att förstå vilka partitioner som är tillgängliga frågar du händelsehubben med någon av de två tillgängliga klienterna: EventHubProducerClient
eller EventHubConsumerClient
I exemplet nedan använder vi en EventHubProducerClient
.
const { EventHubProducerClient } = require("@azure/event-hubs");
async function main() {
const client = new EventHubProducerClient("connectionString", "eventHubName");
const partitionIds = await client.getPartitionIds();
await client.close();
}
main();
Publicera händelser till en händelsehubb
För att kunna publicera händelser måste du skapa en EventHubProducerClient
. I exemplet nedan visas ett sätt att skapa klienten i avsnittet Autentisera klienten för att lära dig andra sätt att instansiera klienten.
Du kan publicera händelser till en viss partition eller låta Event Hubs-tjänsten bestämma vilka partitionshändelser som ska publiceras till. Vi rekommenderar att du använder automatisk routning när publiceringen av händelser måste ha hög tillgänglighet eller när händelsedata ska fördelas jämnt mellan partitionerna. I exemplet nedan använder vi automatisk routning.
- Skapa ett
EventDataBatch
objekt med createBatch - Lägg till händelser i batchen med metoden tryAdd . Du kan göra detta tills den maximala batchstorleksgränsen har nåtts eller tills du är klar med att lägga till antalet händelser som du har gillat, beroende på vilket som inträffar först. Den här metoden returnerar
false
för att indikera att inga fler händelser kan läggas till i batchen på grund av den maximala batchstorleken som nås. - Skicka batchen med händelser med metoden sendBatch .
I exemplet nedan försöker vi skicka 10 händelser till Azure Event Hubs.
const { EventHubProducerClient } = require("@azure/event-hubs");
async function main() {
const producerClient = new EventHubProducerClient("connectionString", "eventHubName");
const eventDataBatch = await producerClient.createBatch();
let numberOfEventsToSend = 10;
while (numberOfEventsToSend > 0) {
let wasAdded = eventDataBatch.tryAdd({ body: "my-event-body" });
if (!wasAdded) {
break;
}
numberOfEventsToSend--;
}
await producerClient.sendBatch(eventDataBatch);
await producerClient.close();
}
main();
Det finns alternativ som du kan skicka i olika steg för att styra processen för att skicka händelser till Azure Event Hubs.
- Konstruktorn
EventHubProducerClient
tar en valfri parameter av typenEventHubClientOptions
som du kan använda för att ange alternativ som antal återförsök. - Metoden
createBatch
tar en valfri parameter av typenCreateBatchOptions
som du kan använda för att speicifiera den maximala batchstorlek som stöds av batchen som skapas. - Metoden
sendBatch
tar en valfri parameter av typenSendBatchOptions
som du kan använda för att angeabortSignal
för att avbryta den aktuella åtgärden. - Om du vill skicka till en specifik partition kan du med en överlagring av
sendBatch
metoden skicka id:t för partitionen att skicka händelser till. Exemplet Granska en händelsehubb ovan visar hur du hämtar de tillgängliga partitions-ID:erna.
Obs! När du arbetar med Azure Stream Analytics ska även brödtexten för händelsen som skickas vara ett JSON-objekt.
Exempelvis: body: { "message": "Hello World" }
Använda händelser från en händelsehubb
Om du vill använda händelser från en Event Hub-instans behöver du också veta vilken konsumentgrupp du vill rikta in dig på. När du vet detta är du redo att skapa en EventHubConsumerClient. I exemplet nedan visas ett sätt att skapa klienten i avsnittet Autentisera klienten för att lära dig andra sätt att instansiera klienten.
Metoden subscribe
på klienten har överlagringar som i kombination med konstruktorn kan hantera flera sätt att använda händelser:
- Använda händelser i en enda process
- Använda händelser med belastningsutjämning över flera processer
- Använda händelser från en enda partition
Metoden subscribe
tar en valfri parameter av typen SubscriptionOptions
som du kan använda för att ange alternativ som maxBatchSize (antal händelser att vänta på) och maxWaitTimeInSeconds (tid att vänta på att maxBatchSize-händelser ska tas emot).
Använda händelser i en enda process
Börja med att skapa en instans av EventHubConsumerClient
och anropa sedan metoden på den subscribe()
för att börja använda händelser.
Metoden subscribe
tar emot återanrop för att bearbeta händelser när de tas emot från Azure Event Hubs.
Om du vill sluta ta emot händelser kan du anropa close()
objektet som returneras av subscribe()
metoden.
const { EventHubConsumerClient, earliestEventPosition } = require("@azure/event-hubs");
async function main() {
const client = new EventHubConsumerClient(
"my-consumer-group",
"connectionString",
"eventHubName"
);
// In this sample, we use the position of earliest available event to start from
// Other common options to configure would be `maxBatchSize` and `maxWaitTimeInSeconds`
const subscriptionOptions = {
startPosition: earliestEventPosition
};
const subscription = client.subscribe(
{
processEvents: async (events, context) => {
// event processing code goes here
},
processError: async (err, context) => {
// error reporting/handling code here
}
},
subscriptionOptions
);
// Wait for a few seconds to receive events before closing
setTimeout(async () => {
await subscription.close();
await client.close();
console.log(`Exiting sample`);
}, 3 * 1000);
}
main();
Använda händelser med belastningsutjämning över flera processer
Azure Event Hubs kan hantera miljontals händelser per sekund. Om du vill skala bearbetningsprogrammet kan du köra flera instanser av ditt program och låta det balansera belastningen sinsemellan.
Börja med att skapa en instans av EventHubConsumerClient
att använda en av konstruktorns överlagringar som tar en CheckpointStore
och anropa subscribe()
sedan metoden för att börja använda händelser. Kontrollpunktsarkivet gör det möjligt för prenumeranterna i en konsumentgrupp att samordna bearbetningen mellan flera instanser av ditt program.
I det här exemplet använder BlobCheckpointStore
vi från @azure/eventhubs-checkpointstore-blob
paketet som implementerar de nödvändiga läs-/skrivåtgärderna till ett varaktigt arkiv med hjälp av Azure Blob Storage.
Metoden subscribe
tar emot återanrop för att bearbeta händelser när de tas emot från Azure Event Hubs.
Om du vill sluta ta emot händelser kan du anropa close()
objektet som returneras av subscribe()
metoden.
const { EventHubConsumerClient } = require("@azure/event-hubs");
const { ContainerClient } = require("@azure/storage-blob");
const { BlobCheckpointStore } = require("@azure/eventhubs-checkpointstore-blob");
const storageAccountConnectionString = "storage-account-connection-string";
const containerName = "container-name";
const eventHubConnectionString = "eventhub-connection-string";
const consumerGroup = "my-consumer-group";
const eventHubName = "eventHubName";
async function main() {
const blobContainerClient = new ContainerClient(storageAccountConnectionString, containerName);
if (!(await blobContainerClient.exists())) {
await blobContainerClient.create();
}
const checkpointStore = new BlobCheckpointStore(blobContainerClient);
const consumerClient = new EventHubConsumerClient(
consumerGroup,
eventHubConnectionString,
eventHubName,
checkpointStore
);
const subscription = consumerClient.subscribe({
processEvents: async (events, context) => {
// event processing code goes here
if (events.length === 0) {
// If the wait time expires (configured via options in maxWaitTimeInSeconds) Event Hubs
// will pass you an empty array.
return;
}
// Checkpointing will allow your service to pick up from
// where it left off when restarting.
//
// You'll want to balance how often you checkpoint with the
// performance of your underlying checkpoint store.
await context.updateCheckpoint(events[events.length - 1]);
},
processError: async (err, context) => {
// handle any errors that occur during the course of
// this subscription
console.log(`Errors in subscription to partition ${context.partitionId}: ${err}`);
}
});
// Wait for a few seconds to receive events before closing
await new Promise((resolve) => setTimeout(resolve, 10 * 1000));
await subscription.close();
await consumerClient.close();
console.log(`Exiting sample`);
}
main();
Mer information finns i Balansera partitionsbelastning över flera instanser av ditt program .
Använda händelser från en enda partition
Börja med att skapa en instans av EventHubConsumerClient
och anropa sedan metoden på den subscribe()
för att börja använda händelser. Skicka ID:t för partitionen som du vill rikta till subscribe()
metoden för att endast använda från partitionen.
I exemplet nedan använder vi den första partitionen.
Metoden subscribe
tar emot återanrop för att bearbeta händelser när de tas emot från Azure Event Hubs.
Om du vill sluta ta emot händelser kan du anropa close()
objektet som returneras av subscribe()
metoden.
const { EventHubConsumerClient, earliestEventPosition } = require("@azure/event-hubs");
async function main() {
const client = new EventHubConsumerClient(
"my-consumer-group",
"connectionString",
"eventHubName"
);
const partitionIds = await client.getPartitionIds();
// In this sample, we use the position of earliest available event to start from
// Other common options to configure would be `maxBatchSize` and `maxWaitTimeInSeconds`
const subscriptionOptions = {
startPosition: earliestEventPosition
};
const subscription = client.subscribe(
partitionIds[0],
{
processEvents: async (events, context) => {
// event processing code goes here
},
processError: async (err, context) => {
// error reporting/handling code here
}
},
subscriptionOptions
);
// Wait for a few seconds to receive events before closing
setTimeout(async () => {
await subscription.close();
await client.close();
console.log(`Exiting sample`);
}, 3 * 1000);
}
main();
Använda EventHubConsumerClient för att arbeta med IotHub
Du kan även använda EventHubConsumerClient
för att arbeta med IotHub. Detta är användbart för att ta emot telemetridata för IotHub från den länkade EventHub.
Den associerade anslutningssträng kommer inte att ha skicka anspråk, därför är det inte möjligt att skicka händelser.
- Observera att anslutningssträng måste vara för en Event Hub-kompatibel slutpunkt (t.ex. "Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/; SharedAccessKeyName=my-SA-name; SharedAccessKey=my-SA-key; EntityPath=my-iot-hub-name")
const { EventHubConsumerClient } = require("@azure/event-hubs");
async function main() {
const client = new EventHubConsumerClient(
"my-consumer-group",
"Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-iot-hub-name"
);
await client.getEventHubProperties();
// retrieve partitionIds from client.getEventHubProperties() or client.getPartitionIds()
const partitionId = "0";
await client.getPartitionProperties(partitionId);
await client.close();
}
main();
Felsökning
AMQP-beroenden
Event Hubs-biblioteket är beroende av rhea-promise-biblioteket för att hantera anslutningar, skicka och ta emot händelser över AMQP-protokollet .
Loggning
Du kan ange AZURE_LOG_LEVEL
miljövariabeln för att aktivera loggning till stderr
:
export AZURE_LOG_LEVEL=verbose
Mer detaljerade anvisningar om hur du aktiverar loggar finns i @azure-/loggningspaketdokumenten.
Du kan också ange DEBUG
miljövariabeln för att hämta loggar när du använder det här biblioteket.
Detta kan vara användbart om du också vill generera loggar från beroendena rhea-promise
och rhea
även.
Observera: AZURE_LOG_LEVEL, om den anges, har företräde framför FELSÖKNING.
Ange azure
inga bibliotek via FELSÖKNING när du också anger AZURE_LOG_LEVEL eller anropar setLogLevel.
- Hämta endast felsökningsloggar på informationsnivå från Event Hubs SDK.
export DEBUG=azure:*:info
- Hämta felsökningsloggar från Event Hubs SDK och biblioteket på protokollnivå.
export DEBUG=azure*,rhea*
- Om du inte är intresserad av att visa rådata (som förbrukar en stor mängd konsol-/diskutrymme) kan du ange
DEBUG
miljövariabeln på följande sätt:
export DEBUG=azure*,rhea*,-rhea:raw,-rhea:message
- Om du bara är intresserad av fel och SDK-varningar kan du ange
DEBUG
miljövariabeln på följande sätt:
export DEBUG=azure:*:(error|warning),rhea-promise:error,rhea:events,rhea:frames,rhea:io,rhea:flow
Nästa steg
Mer exempelkod
Ta en titt på exempelkatalogen för detaljerade exempel på hur du använder det här biblioteket för att skicka och ta emot händelser till/från Event Hubs.
Bidra
Om du vill bidra till det här biblioteket kan du läsa bidragsguiden för att lära dig mer om hur du skapar och testar koden.
Azure SDK for JavaScript