Zelfstudie: Azure Functions uitvoeren vanuit Azure Stream Analytics-taken
In deze zelfstudie maakt u een Azure Stream Analytics-taak die gebeurtenissen van Azure Event Hubs leest, een query uitvoert op de gebeurtenisgegevens en vervolgens een Azure-functie aanroept die naar een Azure Cache voor Redis exemplaar schrijft.
Notitie
- U kunt Azure Functions uitvoeren vanuit Azure Stream Analytics door Functions te configureren als een van de sinks (uitvoer) naar de Stream Analytics-taak. Azure Functions is een gebeurtenisafhankelijke, compute-on-demand-ervaring waarmee u code kunt implementeren die wordt geactiveerd door gebeurtenissen in Azure of services van derden. Door deze mogelijkheid van Azure Functions om op triggers te reageren, is het een natuurlijke uitvoer naar Stream Analytics-taken.
- Stream Analytics activeert Azure Functions via HTTP-triggers. De uitvoeradapter van Azure Functions biedt gebruikers de mogelijkheid om Azure Functions te verbinden met Stream Analytics. Gebeurtenissen kunnen dan worden geactiveerd op basis van Stream Analytics-query's.
- Verbinding met Azure Functions binnen een virtueel netwerk (VNet) vanuit een Stream Analytics-taak die wordt uitgevoerd in een multitenantcluster, wordt niet ondersteund.
In deze zelfstudie leert u het volgende:
- Een Azure Event Hubs-exemplaar maken
- Een instantie van Azure Cache voor Redis maken
- Een Azure-functie maken
- Een Stream Analytics-taak maken
- Event Hub configureren als invoer en functie als uitvoer
- Voer de Stream Analytics-taak uit
- Azure Cache voor Redis controleren op resultaten
Als u nog geen abonnement op Azure hebt, maakt u een gratis account aan voordat u begint.
Vereisten
Voordat u begint, moet u ervoor zorgen dat u de volgende stappen hebt uitgevoerd:
- Als u geen Azure-abonnement hebt, maakt u een gratis account.
- Download de gebeurtenisgenerator-app voor telefoongesprekken, TelcoGenerator.zip vanuit het Microsoft Downloadcentrum of haal de broncode op van GitHub.
Aanmelden bij Azure
Meld u aan bij het Azure-portaal.
Een Event Hub maken
U moet enkele voorbeeldgegevens verzenden naar een Event Hub voordat Stream Analytics de gegevensstroom voor frauduleuze gesprekken kan analyseren. In deze zelfstudie verzendt u gegevens naar Azure met behulp van Azure Event Hubs.
Gebruik de volgende stappen om een Event Hub te maken en oproepgegevens naar die Event Hub te verzenden:
Meld u aan bij het Azure-portaal.
Selecteer Alle services in het linkermenu, selecteer Internet of Things, beweeg de muis over Event Hubs en selecteer vervolgens de knop + (Toevoegen).
Voer op de pagina Naamruimte maken de volgende stappen uit:
Selecteer een Azure-abonnement waar u de Event Hub wilt maken.
Voor de resourcegroep selecteert u Nieuwe maken en voert u een naam in voor de resourcegroep. De Event Hubs-naamruimte wordt gemaakt in deze resourcegroep.
Voer voor naamruimtenaam een unieke naam in voor de Event Hubs-naamruimte.
Selecteer voor Locatie de regio waarin u de naamruimte wilt maken.
Selecteer Standard voor prijscategorie.
Selecteer Controleren en maken onderaan de pagina.
Selecteer onder aan de pagina Controleren en maken van de wizard Naamruimte maken de optie Maken onder aan de pagina nadat u alle instellingen hebt bekeken.
Nadat de naamruimte is geïmplementeerd, selecteert u Ga naar de resource om naar de pagina Event Hubs-naamruimte te navigeren.
Selecteer +Event Hub op de opdrachtbalk op de pagina Event Hubs-naamruimte.
Voer op de pagina Event Hub maken een naam in voor de Event Hub. Stel Aantal partities in op 2. Gebruik de standaardopties in de overige instellingen en selecteer Beoordelen en maken.
Selecteer onder aan de pagina Controleren en maken de optie Maken. Wacht tot de implementatie is voltooid.
Toegang verlenen tot de event hub en een verbindingsreeks ophalen
Voordat een toepassing gegevens naar Azure Event Hubs kan verzenden, moet de event hub een beleid hebben waarmee toegang wordt verleend. Het toegangsbeleid genereert een verbindingsreeks die autorisatiegegevens bevat.
Selecteer op de pagina Event Hubs-naamruimte beleid voor gedeelde toegang in het linkermenu.
Selecteer RootManageSharedAccessKey in de lijst met beleidsregels.
Selecteer vervolgens de knop Kopiëren naast Verbindingsreeks - primaire sleutel.
Plak de verbindingsreeks in een teksteditor. U hebt deze verbindingsreeks nodig in de volgende sectie.
De verbindingsreeks ziet er als volgt uit:
Endpoint=sb://<Your event hub namespace>.servicebus.windows.net/;SharedAccessKeyName=<Your shared access policy name>;SharedAccessKey=<generated key>
U ziet dat de verbindingsreeks meerdere sleutel-waardeparen bevat, gescheiden door puntkomma's: Eindpunt, SharedAccessKeyName en SharedAccessKey.
De app voor het genereren van gebeurtenissen starten
Voordat u de app TelcoGenerator start, moet u deze configureren voor het verzenden van gegevens naar de Azure Event Hubs die u eerder hebt gemaakt.
Pak de inhoud van het bestand TelcoGenerator.zip uit.
Open het
TelcoGenerator\TelcoGenerator\telcodatagen.exe.config
bestand in een teksteditor van uw keuze Er is meer dan één.config
bestand, dus zorg ervoor dat u het juiste bestand opent.Update het
<appSettings>
-element in het configuratiebestand met de volgende details:- Stel de waarde van de EventHubName-sleutel in op de waarde van EntityPath aan het einde van de verbindingsreeks.
- Stel de waarde van de sleutel Microsoft.ServiceBus.ConnectionString in op de verbindingsreeks op de naamruimte. Als u een verbindingsreeks gebruikt voor een Event Hub, niet een naamruimte, verwijdert
EntityPath
u de waarde (;EntityPath=myeventhub
) aan het einde. Vergeet niet de puntkomma te verwijderen die voorafgaat aan de EntityPath-waarde.
Sla het bestand op.
Open vervolgens een opdrachtvenster en ga naar de map waar u de toepassing TelcoGenerator hebt uitgepakt. Voer vervolgens de volgende opdracht in:
.\telcodatagen.exe 1000 0.2 2
Voor deze opdracht worden de volgende parameters gebruikt:
- Aantal records met gespreksgegevens per uur.
- Percentage van fraudekans: hoe vaak de app een frauduleus gesprek moet simuleren. De waarde 0,2 betekent dat ongeveer 20% van de gespreksrecords frauduleus lijken.
- Tijd in uren: het aantal uren dat de app moet worden uitgevoerd. U kunt de app ook op elk gewenst moment stoppen door het proces op de opdrachtregel te beëindigen (met Ctrl+C).
Na enkele seconden begint de app met het weergeven van telefoongesprekrecords op het scherm wanneer deze naar de event hub worden gestuurd. De telefoongesprekgegevens bevatten de volgende velden:
Record Definitie CallrecTime Het tijdstempel voor de begintijd van de oproep. SwitchNum Het schakelnummer van de oproep. In dit voorbeeld zijn de schakelnummers tekenreeksen die staan voor het land/de regio van herkomst (VS, China, UK, Duitsland of Australië). CallingNum Het telefoonnummer van de beller. CallingIMSI De International Mobile Subscriber Identity (IMSI). Dit is een unieke id van de beller. CalledNum Het telefoonnummer van de ontvanger. CalledIMSI De International Mobile Subscriber Identity (IMSI). Dit is een unieke id van de ontvanger.
Een Stream Analytics-taak maken
Nu u een stream van gesprekgebeurtenissen hebt, kunt u een Stream Analytics-taak maken die gegevens uit de event hub leest.
- Als u een Stream Analytics-taak wilt maken, gaat u naar de Azure-portal.
- Selecteer Een resource maken en zoek naar Stream Analytics-taak. Selecteer de tegel Stream Analytics-taak en selecteer Maken.
- Voer op de pagina Nieuwe Stream Analytics-taak de volgende stappen uit:
Selecteer voor Abonnement het abonnement dat de Event Hubs-naamruimte bevat.
Selecteer voor Resourcegroep de resourcegroep die u eerder hebt gemaakt.
Voer in de sectie Instantiedetails voor Naam een unieke naam in voor de Stream Analytics-taak.
Selecteer voor Regio de regio waarin u de Stream Analytics-taak wilt maken. We raden u aan om de taak en de Event Hub in dezelfde regio te plaatsen voor de beste prestaties en zodat u niet betaalt om gegevens over te dragen tussen regio's.
Selecteer cloud als deze nog niet is geselecteerd voor hostingomgeving<. Stream Analytics-taken kunnen worden geïmplementeerd in Cloud of in Edge. Met cloud kunt u implementeren in Azure Cloud en Met Edge kunt u implementeren op een IoT Edge-apparaat.
Selecteer 1 voor streaming-eenheden. Streaming-eenheden vertegenwoordigen de computerresources die nodig zijn om een taak uit te voeren. Deze waarde is standaard ingesteld op 1. Zie het artikel Streaming-eenheden begrijpen en aanpassen voor meer informatie over het schalen van streaming-eenheden.
Selecteer Controleren en maken onderaan de pagina.
- Controleer de instellingen op de pagina Beoordelen en maken en selecteer Vervolgens Maken om de Stream Analytics-taak te maken.
- Nadat de taak is geïmplementeerd, selecteert u Ga naar de resource om naar de pagina van de Stream Analytics-taak te navigeren.
Taakinvoer configureren
In de volgende stap definieert u een invoerbron voor de taak om gegevens te kunnen lezen met de Event Hub die u in de vorige sectie hebt gemaakt.
Selecteer Invoer in de sectie Taaktopologie in het linkermenu op de pagina Stream Analytics-taak.
Selecteer + Invoer en Event Hub toevoegen op de pagina Invoer.
Voer de volgende stappen uit op de Event Hub-pagina :
Voer Voor invoeralias CallStream in. Invoeralias is een beschrijvende naam om uw invoer te identificeren. De invoeralias mag alleen alfanumerieke tekens, afbreekstreepjes en onderstrepingstekens bevatten en moet 3 tot 63 tekens lang zijn.
Selecteer voor Abonnement het Azure-abonnement waarin u de Event Hub hebt gemaakt. De event hub kan zich in dezelfde of een ander abonnement als de Stream Analytics-taak bevinden.
Selecteer voor de Event Hubs-naamruimte de Event Hubs-naamruimte die u in de vorige sectie hebt gemaakt. Alle naamruimten die beschikbaar zijn in uw huidige abonnement, worden weergegeven in de vervolgkeuzelijst.
Selecteer de Event Hub die u in de vorige sectie hebt gemaakt voor de naam van de Event Hub. Alle Event Hubs die beschikbaar zijn in de geselecteerde naamruimte, worden weergegeven in de vervolgkeuzelijst.
Houd voor de Event Hub-consumentengroep de optie Nieuwe maken geselecteerd, zodat er een nieuwe consumentengroep wordt gemaakt op de Event Hub. U wordt aangeraden een afzonderlijke consumentengroep te gebruiken voor elke Stream Analytics-taak. Als er geen consumentengroep is opgegeven, gebruikt de Stream Analytics-taak de
$Default
consumentengroep. Wanneer een taak een self-join bevat of meerdere invoerwaarden heeft, kunnen sommige invoer later door meer dan één lezer worden gelezen. Deze situatie is van invloed op het aantal lezers in één consumentengroep.Selecteer voor de verificatiemodus de verbindingsreeks. Het is eenvoudiger om de zelfstudie met deze optie te testen.
Selecteer voor de naam van het Event Hub-beleid bestaande gebruiken en selecteer vervolgens het beleid dat u eerder hebt gemaakt.
Selecteer Opslaan onder aan de pagina.
Een instantie van Azure Cache voor Redis maken
Een cache in Azure Cache voor Redis maken met behulp van de stappen die zijn beschreven in Een cache maken.
Nadat u de cache hebt gemaakt, selecteert u onder Instellingen de optie Toegangssleutels. Noteer de Primaire sleutel van de verbindingsreeks.
Een functie in Azure Functions maken die gegevens naar de Azure Cache voor Redis schrijft
Zie de sectie Een functie-app maken van de Azure Functions-documentatie. Dit voorbeeld is gebaseerd op:
Maak een standaard-HttpTrigger-functie-app in Visual Studio Code door deze zelfstudie te volgen. De volgende informatie wordt gebruikt: taal: , runtime:
C#
.NET 6
(onder functie v4), sjabloon:HTTP trigger
.Installeer de Redis-clientbibliotheek door de volgende opdracht uit te voeren in een terminal die zich in de projectmap bevindt:
dotnet add package StackExchange.Redis --version 2.2.88
Voeg de
RedisConnectionString
enRedisDatabaseIndex
items toe in deValues
sectie van uwlocal.settings.json
, vul de verbindingsreeks van de doelserver in:{ "IsEncrypted": false, "Values": { "AzureWebJobsStorage": "", "FUNCTIONS_WORKER_RUNTIME": "dotnet", "RedisConnectionString": "Your Redis Connection String", "RedisDatabaseIndex":"0" } }
De Redis Database-index is het getal tussen 0 en 15 waarmee de database op het exemplaar wordt geïdentificeerd.
Vervang de volledige functie (.cs bestand in het project) door het volgende codefragment. Werk de naamruimte, de klassenaam en de functienaam zelf bij:
using System; using System.IO; using System.Threading.Tasks; using Microsoft.AspNetCore.Mvc; using Microsoft.Azure.WebJobs; using Microsoft.Azure.WebJobs.Extensions.Http; using Microsoft.AspNetCore.Http; using Microsoft.Extensions.Logging; using Newtonsoft.Json; using StackExchange.Redis; namespace Company.Function { public static class HttpTrigger1{ [FunctionName("HttpTrigger1")] public static async Task<IActionResult> Run( [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req, ILogger log) { // Extract the body from the request string requestBody = await new StreamReader(req.Body).ReadToEndAsync(); if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check dynamic data = JsonConvert.DeserializeObject(requestBody); // Reject if too large, as per the doc if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge string RedisConnectionString = Environment.GetEnvironmentVariable("RedisConnectionString"); int RedisDatabaseIndex = int.Parse(Environment.GetEnvironmentVariable("RedisDatabaseIndex")); using (var connection = ConnectionMultiplexer.Connect(RedisConnectionString)) { // Connection refers to a property that returns a ConnectionMultiplexer IDatabase db = connection.GetDatabase(RedisDatabaseIndex); // Parse items and send to binding for (var i = 0; i < data.Count; i++) { string key = data[i].Time + " - " + data[i].CallingNum1; db.StringSet(key, data[i].ToString()); log.LogInformation($"Object put in database. Key is {key} and value is {data[i].ToString()}"); // Simple get of data types from the cache string value = db.StringGet(key); log.LogInformation($"Database got: {key} => {value}"); } } return new OkResult(); // 200 } } }
Wanneer Stream Analytics de uitzondering HTTP-aanvraagentiteit te groot ontvangt van de functie, vermindert deze de grootte van de batches die naar functies worden verzonden. De volgende code zorgt ervoor dat er geen te grote batches worden verzonden met Stream Analytics. Zorg dat het maximum aantal batches en de grootten die in de functie worden gebruikt, consistent zijn met de waarden die zijn ingevoerd in de Stream Analytics-portal.
De functie kan nu worden gepubliceerd naar Azure.
Open de functie in Azure Portal en stel toepassingsinstellingen in voor
RedisConnectionString
enRedisDatabaseIndex
.
De Stream Analytics-taak bijwerken met de functie als uitvoer
Open uw Stream Analytics-taak in Azure Portal.
Blader naar uw functie en selecteer Overzicht>Uitvoer>Toevoegen. Selecteer Azure-functie als sinkoptie als u een nieuwe uitvoer wilt toevoegen. De Functions-uitvoeradapter heeft de volgende eigenschappen:
Eigenschapsnaam Beschrijving Uitvoeralias Een gebruiksvriendelijke naam die u in query van de taak gebruikt om naar de uitvoer te verwijzen. Importoptie U kunt de functie van het huidige abonnement gebruiken of de instellingen handmatig opgeven als de functie zich in een ander abonnement bevindt. Functie-app Naam van uw Azure Functions-app. Functie Naam van de functie in uw Azure Functions-app (naam van de functie run.csx). Maximale batchgrootte Hiermee stelt u de maximale grootte (in bytes) in voor elke uitvoerbatch die wordt verzonden naar de functie. Deze waarde is standaard ingesteld op 262.144 bytes (256 kB). Maximum aantal batches Hiermee geeft u het maximum aantal gebeurtenissen in elke batch op die naar de functie worden verzonden. De standaardwaarde is 100. Deze eigenschap is optioneel. Sleutel Hiermee kunt u een functie uit een ander abonnement gebruiken. Geef de waarde op van de sleutel die toegang geeft tot uw functie. Deze eigenschap is optioneel. Geef een naam op voor de uitvoeralias. In deze zelfstudie heet het saop1, maar u kunt elke gewenste naam gebruiken. Vul de andere details in.
Open uw Stream Analytics-taak en werk de query als volgt bij.
Belangrijk
In het volgende voorbeeldscript wordt ervan uitgegaan dat u CallStream hebt gebruikt voor de invoernaam en saop1 voor de uitvoernaam. Als u verschillende namen hebt gebruikt, vergeet dan niet om de query bij te werken.
SELECT System.Timestamp as Time, CS1.CallingIMSI, CS1.CallingNum as CallingNum1, CS2.CallingNum as CallingNum2, CS1.SwitchNum as Switch1, CS2.SwitchNum as Switch2 INTO saop1 FROM CallStream CS1 TIMESTAMP BY CallRecTime JOIN CallStream CS2 TIMESTAMP BY CallRecTime ON CS1.CallingIMSI = CS2.CallingIMSI AND DATEDIFF(ss, CS1, CS2) BETWEEN 1 AND 5 WHERE CS1.SwitchNum != CS2.SwitchNum
Start de toepassing telcodatagen.exe door de volgende opdracht uit te voeren in de opdrachtregel. De opdracht maakt gebruik van de notatie
telcodatagen.exe [#NumCDRsPerHour] [SIM Card Fraud Probability] [#DurationHours]
.telcodatagen.exe 1000 0.2 2
Start de Stream Analytics-taak.
Op de pagina Monitor voor uw Azure-functie ziet u dat de functie wordt aangeroepen.
Selecteer op de Azure Cache voor Redis pagina uw cache metrische gegevens in het linkermenu, voeg metrische gegevens voor cacheschrijfgegevens toe en stel de duur in op afgelopen uur. U ziet de grafiek die lijkt op de volgende afbeelding.
Azure Cache voor Redis controleren op resultaten
De sleutel ophalen uit Azure Functions-logboeken
Haal eerst de sleutel op voor een record die is ingevoegd in Azure Cache voor Redis. In de code wordt de sleutel berekend in de Azure-functie, zoals wordt weergegeven in het volgende codefragment:
string key = data[i].Time + " - " + data[i].CallingNum1;
db.StringSet(key, data[i].ToString());
log.LogInformation($"Object put in database. Key is {key} and value is {data[i].ToString()}");
Blader naar Azure Portal en zoek uw Azure Functions-app.
Selecteer Functies in het menu links.
Selecteer HTTPTrigger1 in de lijst met functies.
Selecteer Monitor in het linkermenu.
Ga naar het tabblad Logboeken .
Noteer een sleutel uit het informatieve bericht, zoals wordt weergegeven in de volgende schermopname. U gebruikt deze sleutel om de waarde in Azure Cache voor Redis te vinden.
Gebruik de sleutel om de record te vinden in Azure Cache voor Redis
Blader naar Azure Portal en zoek uw Azure Cache voor Redis. Selecteer Console.
Gebruik Azure Cache voor Redis-opdrachten om te controleren of uw gegevens in Azure Cache voor Redis staan. (De opdracht gebruikt de indeling {key}.) Gebruik de sleutel die u hebt gekopieerd uit de monitorlogboeken voor de Azure-functie (in de vorige sectie).
'KEY-FROM-THE-PREVIOUS-SECTION' OPHALEN
Met deze opdracht wordt de waarde voor de opgegeven sleutel afgedrukt:
Foutafhandeling en nieuwe pogingen
Als tijdens het verzenden van gebeurtenissen naar Azure Functions een fout optreedt, worden de meeste bewerkingen opnieuw geprobeerd in Stream Analytics. Alle HTTP-uitzonderingen worden opnieuw geprobeerd totdat ze zijn geslaagd, met uitzondering van http-fout 413 (entiteit te groot). Een fout over een entiteit die te groot is, wordt beschouwd als een gegevensfout die onderhevig is aan het beleid voor opnieuw proberen of verwijderen.
Notitie
De time-out voor HTTP-aanvragen van Stream Analytics naar Azure Functions is ingesteld op 100 seconden. Als het meer dan 100 seconden duurt voordat uw Azure Functions-app een batch verwerkt, worden Stream Analytics-fouten weergegeven en wordt het opnieuw geprobeerd voor de batch.
Het opnieuw proberen van time-outs kan leiden tot dubbele gebeurtenissen die naar de uitvoersink zijn geschreven. Als Stream Analytics een mislukte batch opnieuw probeert te verwerken, gebeurt dat voor alle gebeurtenissen in de batch. Neem bijvoorbeeld een batch van 20 gebeurtenissen die naar Azure Functions worden verzonden vanuit Stream Analytics. Stel dat Azure Functions 100 seconden nodig heeft om de eerste 10 gebeurtenissen in die batch te verwerken. Na 100 seconden onderbreekt Stream Analytics de aanvraag omdat deze geen positief antwoord van Azure Functions heeft ontvangen en er een andere aanvraag voor dezelfde batch wordt verzonden. De eerste tien gebeurtenissen in de batch worden opnieuw verwerkt door Azure Functions, waardoor een duplicaat ontstaat.
Bekende problemen
Wanneer u in Azure Portal de waarde van Maximale batchgrootte/Maximum aantal batches probeert leeg te maken (standaard), verandert de waarde weer in de eerder ingevoerde waarde wanneer u de gegevens opslaat. Voer in dat geval de standaardwaarden voor deze velden handmatig in.
Het gebruik van HTTP-routering in Azure Functions wordt momenteel niet ondersteund in Stream Analytics.
Ondersteuning voor het maken van verbinding met Azure Functions die wordt gehost in een virtueel netwerk, is niet ingeschakeld.
Resources opschonen
Wanneer u een resourcegroep niet meer nodig hebt, verwijdert u de resourcegroep, de streamingtaak en alle gerelateerde resources. Door de taak te verwijderen, voorkomt u dat de streaming-eenheden die door de taak worden verbruikt, in rekening worden gebracht. Als u denkt dat u de taak in de toekomst nog gaat gebruiken, kunt u deze stoppen en later opnieuw starten wanneer dat nodig is. Als u deze taak niet wilt blijven gebruiken, verwijdert u alle resources die in deze quickstart zijn gemaakt door de volgende stappen uit te voeren:
- Selecteer in het menu aan de linkerkant in Azure Portal de optie Resourcegroepen en selecteer vervolgens de resource die u hebt gemaakt.
- Selecteer op de pagina van uw resourcegroep de optie Verwijderen, typ de naam van de resource die u wilt verwijderen in het tekstvak en selecteer vervolgens Verwijderen.
Volgende stappen
In deze zelfstudie hebt u een eenvoudige Stream Analytics-taak gemaakt waarmee een Azure-functie wordt uitgevoerd. Ga voor meer informatie over Stream Analytics-taken verder met de volgende zelfstudie: