Share via


StreamInsight et le modèle de conception Observateur

Depuis l’ouverture de ce blog, nous avons au travers d’un certain nombre de billets quelque peu « débroussaillé » l’écosystème Hadoop et tous les projets et outils qui gravitent autour, ainsi que les usages qui peuvent en être fait autour des domaines Volume et Variété tels que caractérisés par le principe des 3V (Volume, Vitesse, Variété) du Gartner de plus en plus démocratisé. (Ces 3 domaines clés permettent de définir ce que sont les Big Data et vous serez amenés très probablement à faire face à chacun des Vs à un degré ou un autre si ce n’est d’ores et déjà le cas.)

Dans le cas présent, on parle de l’écosystème Hadoop pour l’analyse sur des données aussi bien structurées que non-structurées. Mais ici, le terme « non-structuré » désigne une donnée sans relation/contexte. Typiquement, une base de données relationnelle (Microsoft SQL Server par exemple) décrit une forte intégrité aux données qu’elle contient par le biais de relations. Cependant, la non-structure de la donnée ne veut pas dire que la donnée soit désorganisée. Au contraire, avec l’expérience, on se rend compte qu’il nous faudra organiser la donnée avant de procéder à tout traitement Map/Reduce. Dans le cas opposé, l’automatisation du processus de traitement Map/Reduce sera difficile.

Beaucoup de demandes sont exprimées aujourd’hui vis-à-vis de ce type d’usage envisagé ici, demandes qui permettent notamment au décisionnel de prendre la vague des Big Data comme illustré ici.

Mais une fois la décision prise d’investir dans cette direction, plusieurs questions se posent comme typiquement :

  • Est-ce que mes données sont suffisantes ?
  • Si non, comment se constitue-t-on un jeu de données suffisant ?
  • Comment doit-on les organiser ?

Constitution d’un jeu de données exploitables

Si, dans ce contexte, nous connaissons les techniques de traitement (Map/Reduce avec Hadoop), une des grandes problématiques résulte dans la possession de données exploitables.

Pour répondre à cette problématique, nous allons nous appuyer sur la technologie Microsoft StreamInsight. L’architecture de StreamInsight est conçue pour absorber et agréger de nombreuses données, provenant de flux différents. Son utilité première, vous l’aurez donc deviné, est l’analyse en temps réel de données et donc de prendre en considération les domaines Vitesse et Variété du principe des 3V précédent.

Dans ce billet, nous allons en détourner l’usage pour en faire un « d’aspirateur de données ». Une fois une quantité conséquente de données entreposées, nous l’analyserons avec les méthodes Big Data que propose Hadoop.

StreamInsight va nous permettre de constituer notre jeu de données exploitable sur lequel nous pourrons ensuite faire nos analyses, et il le fera de façon automatique, avec un minimum de programmation. Ce cas d’usage de StreamInsight diffère quelque peu de celui illustré dans le cadre des après-midi du dev consacrées aux Big Data : Et la vélocité? StreamInsight (avec du code LINQ).

Dans un cas d’usage réel, l’approche nous permettrait de placer toutes sortes de capteurs/traceurs à différents endroits sur la plateforme de production à exploiter pour la circonstance, enregistrant les actions des utilisateurs. Dans le cadre du tutoriel que nous vous proposons ici, et en l’absence d’une telle plateforme, nous allons simplement utiliser une source régulièrement sollicitée dans les tutoriels sur les Big Data, en l’occurrence Twitter.

Twitter a l’avantage de fournir beaucoup de données, rapidement, et de façon gratuite. Il vous reviendra la tâche d’adapter ensuite ce tutoriel à votre propre utilisation ;)

Présentation de Twitter API

Twitter API est un groupement d’interface fournie aux développeurs donnant accès aux tweets hébergés par le réseau social. Si plusieurs possibilités existent, celle qui nous intéresse s’appelle la Streaming API. L’idée derrière cette fonction offerte par Twitter de façon publique est de donner accès au flux global Twitter sans passer par une boucle envoyant une nouvelle requête HTTP à Twitter.

Cette interface diffère de l’API REST par le fait qu’elle demande une connexion HTTP constamment ouverte. On évite ainsi un renvoie de l’entête HTTP qui consomme de la bande passante. On se représente le flux comme une requête infinie.

Twitter propose plusieurs points de terminaison, correspondant à des cas d’utilisations différents :

  • Public streams : le flux de données publiques, c’est-à-dire l’arrivée en continu des Tweets de comptes publiques.
  • User streams : le flux de Tweets d’un utilisateur en particulier.
  • Site streams : une version multi-utilisateurs du point de terminaison User streams.

Nous utiliserons le premier, Public streams. En ouvrant ce lien proposé par Twitter depuis un browser, vous pouvez apercevoir un texte qui défile au fur-et-à-mesure, ne s’arrêtant jamais. Une ligne équivaut à un nouveau tweet au format JSON (JavaScript Object Notation).

La structure JSON d’un tweet est décrite dans la documentation Twitter ici. Pour notre cas de test, nous ne garderons que certains attributs du tweet :

  • user.screen_name : le nom d’utilisateur postant le tweet.
  • id_str : l’id du tweet.
  • created_at : la date de création.
  • text : le contenu du tweet.
  • lang : la langue de l’utilisateur.

Concepts de base de StreamInsight

StreamInsight est une plate-forme permettant de traiter des événements complexes et d’analyser leurs résultats en temps réels. On parle en anglais de Complex Event Processing (CEP).

CEP est une méthode dite « in-memory » pour l’analyse de flux de données en continu, ceux-ci provenant de sources multiples, basé sur un langage de requête déclaratif - LINQ (.NET Language-Integrated Query) en l’occurrence avec .NET – avec une latence faible sur un grand nombre d’évènements.

Si vous vous demandez quels sont les cas d’usage, imaginez que nous recevions les données de géolocalisations de milliers de téléphones et ce, toutes les secondes. Si vous développez un service qui réceptionne sur un serveur ces informations de géolocalisation, ce dernier sera très vite saturé car il n’est pas adapté à un nombre de requêtes aussi important, à traiter aussi rapidement. On est bien dans le domaine Vitesse. La plateforme StreamInsight ne le sera pas. Dans sa version standard, celle-ci est capable de recevoir 10 000 évènements à la seconde !

Concrètement,  une bibliothèque de classes .NET est mise à notre disposition pour analyser (agréger et corréler) en temps réel les données qui arrivent en continu dans un flux. Pour revenir à notre aspirateur, objet de ce tutoriel, celui-ci doit sur cette base adapter ensuite une sortie pour un ou plusieurs programmes, et ce à des fins de stockage.

En termes de mise en œuvre, cela suppose typiquement de s’appuyer sur les éléments suivants :

  • Adaptateur en entrée (Input Adapter) : un objet de ce type expose une interface pour collecter les données issues de sources différentes (bases de données, services Web, capteurs, flux réseaux sociaux, etc.).
  • Requêtes LINQ : elles agissent comme des filtres permettant de récupérer et d’agréger les données souhaitées. Elles se situent généralement dans la méthode principale d’une application.
  • Adaptateur en sortie (Output Adapter) : un objet de ce type sert à adapter les données ainsi sélectionnées aux utilisations que nous souhaitons en faire : les stocker par exemple dans Windows Azure en vue d’un traitement ultérieur.

Vous trouverez sur la forge CodePlex un exemple d’application StreamInsight dûment documenté. Ce dernier met en scène une analyse de sentiments sur Twitter via une analyse des tweets en temps réel et une visualisation de leurs tendances.

Comme vous pourrez le constater, le concept est très puissant, mais il impose une relative complexité : il est en effet nécessaire de construire pour un adaptateur une classe Factory associée.

A partir de la version 1.1 de StreamInsight, l’équipe en charge du projet StreamInsight au sein du groupe produit SQL Server propose en parallèle une nouvelle façon d’aborder les choses, plus simple, à savoir l’utilisation du pattern (modèle de conception) Observateur (observer), bien connu dans la programmation orientée objet. (L’approche initiale succinctement décrite ci-avant reste toujours prise en charge.)

Modèle de conception Observateur

Ce modèle de conception fait partie des principaux énoncés dans l’ouvrage Design Patterns : Elements of Reusable Object-Oriented Software - l’un des ouvrage de référence sur les patrons de conceptions. Il s’avère particulièrement utile dans une application lorsque l’on a besoin d’ajouter un couplage entre deux objets lors de l’exécution (at runtime). Il agit donc sur le comportement d’un objet, c’est ce que l’on appelle un patron de conception de type comportemental.

Ainsi un objet nommé observable est simplement observé par d’autres objets appelés observateurs. Pour prendre un exemple, on l’utilise fréquemment pour faire de la programmation évènementielle, afin de séparer l’affichage du comportement qui est lié à celle-ci (si on clique sur le bouton Valider, un objet est notifié et se charge de l’enregistrement du formulaire).

On évite grâce à ce procédé de passer par une attente active sur l’objet produisant l’évènement (ce qui se traduit par une boucle infinie testant à chaque tour si l’interface a changée). De plus, cela permet à l’objet observé de ne pas connaitre à l’avance ses observateurs (=suppression de dépendances).

Le schéma ci-dessous explique le déroulement :

image

L’observable, en réalité, possède une liste des observateurs qui se sont abonnés à lui. Ces observateurs implémentent tous une interface spécifique, IObserver, qui définit une fonction précise à appeler lorsqu’un nouvel évènement doit être diffusé.

Pour aller plus loin dans la compréhension du modèle de conception, vous pouvez consulter l’article MSDN Modèle de design observateur.

Le Framework. NET apporte sa propre implémentation du modèle de conception Observateur depuis la version 4.0, ainsi que, dans le contexte qui nous intéresse avec StreamInsight, au sein d’une bibliothèque appelée Reactive Extensions (Rx). (Cette bibliothèque a été créée de manière indépendante de StreamInsight, mais StreamInsight repose dessus). Vous y trouverez par exemple l’interface IObserver<T> ainsi que l’interface IObservable<T> .

Avec la version 2.1 de StreamInsight, des méthodes d’extensions sont disponibles, elles permettent ni plus ni moins de transformer un objet implémentant l’interface IObservable<T> en un flux (CepStream, la classe de base d’un flux StreamInsight).

A l’arrivée, l’approche autorise un bénéfice non négligeable, à savoir le fait d’écrire son application sans aucune compréhension préalable du concept de CEP, et ce simplement via l’utilisation d’un modèle de conception simple et bien connu.

StreamInsight se charge de « manager » votre code et de transformer vos classes en objets Cep (une inversion de contrôle est effectuée).

Conception de notre application AspirateurTwitter exemple

Après toutes ces explications, nous pouvons passer à la modélisation de notre propre exemple application CEP, en utilisant le modèle de conception Observateur. Le code de source de l'application est disponible en pièce jointe sous forme de fichier archive .zip en bas de ce billet. N'hésitez pas à le consulter en parallèle des explications ci-dessous.

Un peu d’UML

 

image

Voilà quelques petites précisions :

  1. Nous définissons une classe Tweet, simple POCO (Plain Old CLR Object). Notre identifiant est un entier base 64, le reste est stocké dans des chaines de caractères. Cette classe surcharge la méthode ToString() qui sera utilisée pour remplir un blob Azure.
  2. Notre source de données est décrite dans la classe SourceTwitter. Cette classe implémente l’interface IObservable<T> qui définit une seule méthode : Subscribe(). Elle est appelée lorsqu’un observateur veut s’abonner (=souscrire) à la source. Dès lors, on ajoute le nouvel arrivant dans la liste des observateurs. De plus, nous définissons ici une méthode interne, notifierObservateurs, qui a pour rôle l’appel de la méthode OnNext() de chaque observateur abonné.
  3. Notre observateur du flux Twitter est décrit dans la classe ObservateurTwitter, qui implémente l’interface IObserver<T>. Il y a une surcharge de la méthode OnNext(), prenant un objet Tweet comme paramètre, de la méthode OnError() et OnCompleted(). Cette dernière méthode est invoquée pour désabonner l’observateur de la source.

Souscription

Dans notre proposition de mise en œuvre, la classe Souscription est un peu à part. Ce qu’il faut en savoir est qu’elle est nécessaire pour désabonner convenablement l’observateur de l’observable. Lorsqu’un objet de type IObserver veut quitter la liste des notifiés, il déclenche sa méthode IObserver.OnCompleted() qui invoque la méthode Souscription.Dispose().

Avant cela, une instance de Souscription est retournée par la méthode SourceTwitter.Subscribe()au moment où un nouvel observateur veut s’abonner. Elle est composée de l’observateur et de l’observable.

Pourquoi cette classe est-elle nécessaire ? Elle découple l’observateur de l’observable tout simplement. Dans notre application AspirateurTwitter, vous la trouverez comme classe interne à SourceTwitter.

Invocation de l’API Twitter Streaming

Notre invocation se situe dans le constructeur de la classe SourceTwitter. Notre exemple est simple et n’a pas la prétention d’une application de production.

Dans ce constructeur, on peut voir une création de tâche via la bibliothèque parallèle de tâches TPL (Task Parallel Library) qui s’exécutera donc sur un thread différent, pour ne pas bloquer le programme. A l’intérieur, une boucle infinie est faite sur un flux (au travers d’un StreamReader) provenant d’une requête HTTP (sur Twitter Streaming). Pour chaque itération, on récupère la nouvelle ligne dans une chaine de caractères, puis on lance la phase de notification de tous les observateurs (grâce à la méthode notifierObservateurs).

Le fichier App.config contient les définitions de deux paramètres Twitter que vous aurez à remplacer pour tester le code proposé pour l’illustration :

  • twitterAccount : le nom du compte de stockage Windows Azure.
  • twitterPass : la clef associée au compte ci-dessus.

L’utilisation de ces paramètres s’appuie sur la classe ConfigurationManager proposé par le Framework .NET. (Vous pouvez consulter un tutoriel ici à ce sujet pour plus d’information).

Filtrage avec LINQ

Dans la classe Program, puis la méthode Main(), nous créons en premier lieu une instance d’une nouvelle source SourceTwitter, puis nous la transformons en objet CepStream grâce à la méthode d’extension ToPointStream(). Vient ensuite ce qui nous intéresse, à savoir le filtre LINQ. Il s’agit d’une simple requête LINQ comme on a l’habitude d’en faire sur le CepStream récupéré précédemment :

var filtre = from tweet where tweet.Lang == “fr” in source select tweet;

 

Cette simple ligne permet de sélectionner les tweets de langue française. A vous de modifier selon votre besoin ;)

Passons à la suite de la méthode Main(), nous transformons cet objet filtre en un objet Observable avec la méthode d’extension ToPointObservable(). Nous souscrivons notre objet ObservateurTwitter à cet objet Observable qui le notifiera à son tour pour chaque nouveau tweet de langue française !

Bien sûr il vous est possible d’ajouter un deuxième voir même - soyons fous ;) - un troisième observateur de cette requête. Il y aurait par exemple un observateur qui se charge de stocker les données sur un compte de stockage Windows Azure dans le Cloud, un autre alimentant une interface de visualisation sur un site Web, proposant ainsi un affichage temps réel d’un graphique, etc.

Stockage des tweets sur Windows Azure

Si vous êtes attentifs, la classe ObservateurTwitter possède des méthodes privées ajouterAuBuffer()et telechargerSurAzure(). En effet, nous avons décidé de mettre directement dans notre observateur la logique de téléchargement de nos tweets sur un compte de stockage Windows Azure. Un attribut buffer - tableau de bytes - stocke les tweets au fur et à mesure. Un index est incrémenté pour remplir le buffer, et lorsque celui-ci arrive à équivalence de la taille totale du buffer, la méthode telechargerSurAzure() est appelée. Cette méthode se charge de récupérer le buffer pour le télécharger comme objet blob sur un conteneur Azure en particulier.

Le fichier App.config contient les définitions de trois paramètres Windows Azure que vous aurez à remplacer pour tester le code :

  • accountName : le nom du compte de stockage Windows Azure.
  • accountKey : la clef associée au compte ci-dessus.
  • containerRef : le nom du conteneur principal sur lequel vous voulez stocker les blobs.

Déploiement de notre application AspirateurTwitter

Passons maintenant au déploiement de notre solution nommée « AspirateurTwitter ».

Installation de StreamInsight Server 2.1

Vous trouverez le guide pas à pas d’installation en français ici. Suivez les instructions, mais faites attention à la version de StreamInsight qui est indiquée dans un lien pour le télécharger. A l’heure actuelle, ce lien pointe sur une version antérieure à la 2.1, néanmoins vous pouvez retrouver la version 2.1 sur la page suivante : Microsoft SQL Server StreamInsight 2.1.

Note : la version dite « client » ne nous servira pas vis-à-vis de ce tutoriel.

Une fois l’installation terminée, veuillez garder en mémoire le nom du serveur d’instance StreamInsight que vous avez ajouté lors de l’installation. Il nous servira par la suite.

Configuration de l’application AspirateurTwitter

Le code source de l’application est proposé en pièce jointe de ce billet. Ouvrez la solution dans Visual Studio ; de notre côté, nous avons utilisé la version 2012.

Le projet utilise trois bibliothèques :

  • Microsoft CEP, installé avec StreamInsight. Vous pouvez y accéder dans l’onglet Extensions du Reference Manager. Une bibliothèque de méthodes d’extensions est aussi installée automatiquement et utilisée pour le modèle de conception Observateur : Reactive Extensions (Rx).
  • Newtonsoft.Json, qui est ajoutée avec NuGet.
  • Microsoft Windows Azure SDK que vous pouvez aussi récupérer avec NuGet, ou bien installer la plateforme Windows Azure.

Si vous ne connaissez pas NuGet, pas de panique car il est intégré à Visual Studio 2012. (Pour les versions antérieures, il vous faudra installer une extension.) NuGet un gestionnaire de paquets qui télécharge automatiquement les bibliothèques externes dont vous avez besoin dans votre projet, celles-ci étant ensuite mises à jour automatiquement. Vous trouverez un tutoriel sur l’utilisation de NuGet ici.

Code Source de l’application AspirateurTwitter

Le projet proposé contient un fichier de configuration (« .config ») et 4 fichiers sources (« .cs ») :

  • App.config est un fichier XML que vous devez éditer pour modifier les variables de configuration Twitter, Windows Azure et StreamInsight :

- <appSettings>

-     <!--Editez chaque valeur pour correspondre à vos identifiants-->

-     <add key="accountName" value="votre_compte_stockage_azure"/>

-     <add key="accountKey" value="votre_clef_de_stockage_azure"/>

-     <add key="containerRef" value="votre_conteneur_principal_azure"/>

-     <add key="twitterAccount" value="votre_identifiant_twitter"/>

-     <add key="twitterPass" value="votre_mot_de_passe_twitter"/>

-     <add key="serveurStreamInsight" value="nom_du_serveur_stream_insight"/>

  </appSettings>

  • Program.cs est la classe principale de notre application. La fonction Main() est exécutée et contient les instructions d’instanciation de nos classe Observable et Observateur.
  • SourceTwitter.cs est la classe implémentant IObservable<T>. La méthode Subscribe() est la seule surcharge nécessaire à réaliser pour répondre au contrat de IObservable<T>. Dans cette classe, on écoute l’API Streaming de Twitter, on parse le texte au format JSON renvoyé, puis on le stocke dans un objet de type Tweet.cs et enfin, on notifie les observateurs.
  • ObservateurTwitter.cs est la classe implémentant IObserver<T>. Les méthodes OnNext(), OnCompleted() et OnError() sont nécessaire à la validation du contrat.
  • Tweet.cs est la classe de base stockant un tweet sous forme d’objet. La méthode ToString() retourne une chaine de caractères formatée de la manière suivant :

 <id>[tabulation]<createdAt>[tabulation]<userName>[tabulation]<text>

Le schéma ci-dessous indique le cheminent d’un tweet dans notre application AspirateurTwitter:

image

Pourquoi une taille de 255 Mo (267386880 octets) ? Par expérience, le traitement Map/Reduce d’une (possible future) application Big Data sur Windows Azure HDInsight lit de manière distribuée les données contenues dans un seul conteneur par block de 255 Mo. Avec cette astuce, nous optimisons le traitement futur et évitons ainsi une baisse de performance. Maintenant, vous pouvez tester avec une valeur inférieure - modifiez la constante dans ObservateurTwitter tailleBuffer - sinon cela risque d’être long avant que vous puissiez voir un résultat.

Une dernière chose à préciser, le stockage dans Windows Azure d’un blob s’effectue dans un dossier « input » (en partant de racine du conteneur), puis est répertorié par date de téléchargement. Le fichier possède un nom unique, en réalité un nombre représentant le TimeStamp du système d’exploitation lors de l’enregistrement du fichier.

En guise de conclusion

Ce tutoriel tire à sa fin. Ce dernier a illustré comment écrire une application CEP sur StreamInsight 2.1 avec le modèle de conception Observateur, et ce, avec un minimum de code. Comme indiqué précédemment, le code source est accessible dans le projet AspirateurTwitter proposé en pièce jointe de ce billet.

Si vous avez envie de pousser plus loin vos investigations sur cette technologie StreamInsight, voici quelques ressources utiles :

Pour vous tenir informé(e) des nouveautés et exemples, n’hésitez pas à consulter le blog MSDN dédié. Mentionnons enfin un billet sur les macros LINQ qui détaille quelques cas d’utilisation.

Merci de votre lecture et à très bientôt pour de nouvelles découvertes !

imageimage

AspirateurTwitter.zip