Partage via


Streaming avec Orleans

Orleans v.1.0.0 a ajouté la prise en charge des extensions de streaming au modèle de programmation. Les extensions de streaming fournissent un ensemble d’abstractions et d’API qui facilitent et renforcent la réflexion sur les flux et leur utilisation. Les extensions de streaming permettent aux développeurs d’écrire des applications réactives qui fonctionnent sur une séquence d’événements de manière structurée. Le modèle d’extensibilité des fournisseurs de flux rend le modèle de programmation compatible et transposable avec un large éventail de technologies de mise en file d’attente existantes, telles que Event Hubs, ServiceBus, les files d’attente Azure et Apache Kafka. Il n’est pas nécessaire d’écrire de code spécial ou d’exécuter des processus dédiés pour interagir avec ces files d’attente.

Pourquoi s’en soucier ?

Si vous connaissez déjà le traitement de flux et les technologies telles que Event Hubs, Kafka, Azure Stream Analytics, Apache Storm, Apache Spark Streaming et les extensions réactives (Rx) dans .NET, vous vous demandez peut-être pourquoi vous devriez vous en soucier. Pourquoi avons-nous besoin d’un autre système de traitement de flux et comment les acteurs sont-ils liés aux flux ?La section « Pourquoi utiliser les flux Orleans ? » a pour objectif de répondre à cette question.

Modèle de programmation

Il existe plusieurs principes derrière le modèle de programmation des flux Orleans :

  1. Les flux Orleans sont virtuels. Autrement dit, un flux existe toujours. Il n’est pas explicitement créé ni détruit, et il ne peut jamais échouer.
  2. Les flux sont identifiés par des ID de flux, qui sont simplement des noms logiques composés de GUID et de chaînes.
  3. Les flux Orleans vous permettent de dissocier la génération des données de leur traitement, à la fois dans le temps et dans l’espace. Cela signifie que le producteur de flux et le consommateur de flux peuvent se trouver sur des serveurs différents ou dans différents fuseaux horaires, et supporter des échecs.
  4. Les flux Orleans sont légers et dynamiques. Le runtime de streaming Orleans est conçu pour gérer un grand nombre de flux qui vont et viennent à grande vitesse.
  5. Les flux Orleans présentent des liaisons dynamiques. Le runtime de streaming Orleans est conçu pour gérer les cas où des grains se connectent aux flux et s’en déconnectent à grande vitesse.
  6. Le runtime de streaming Orleans gère en toute transparence le cycle de vie de la consommation des flux. Après s’être abonnée à un flux, une application reçoit les événements de ce flux, même en présence d’échecs.
  7. Les flux Orleans fonctionnent uniformément entre les grains et les clients Orleans.

API de programmation

Les applications interagissent avec les flux en utilisant Orleans.Streams.IAsyncStream<T>, qui implémente les interfaces Orleans.Streams.IAsyncObserver<T> et Orleans.Streams.IAsyncObservable<T>. Ces API sont similaires aux extensions réactives (Rx) dans .NET, bien connues.

Dans un exemple typique ci-dessous, un appareil génère des données, qui sont envoyées en tant que requête HTTP au service s’exécutant dans le cloud. Le client Orleans qui s’exécute sur le serveur frontal reçoit cet appel HTTP et publie les données dans un flux d’appareil correspondant :

public async Task OnHttpCall(DeviceEvent deviceEvent)
{
     // Post data directly into the device's stream.
     IStreamProvider streamProvider =
        GrainClient.GetStreamProvider("MyStreamProvider");

    IAsyncStream<DeviceEventData> deviceStream =
        streamProvider.GetStream<DeviceEventData>(
            deviceEvent.DeviceId, "MyNamespace");

     await deviceStream.OnNextAsync(deviceEvent.Data);
}

Dans un autre exemple ci-dessous, un utilisateur de conversation (implémenté en tant que grain Orleans) rejoint une salle de conversation, obtient un handle pour un flux des messages de conversation générés par tous les autres utilisateurs de cette salle, et s’y abonne. Notez que l’utilisateur de conversation n’a pas besoin d’être informé du grain de salle de conversation lui-même (il peut ne pas y avoir un tel grain dans notre système) ni des autres utilisateurs de ce groupe qui produisent des messages. Inutile de dire que, pour publier dans le flux de conversation, les utilisateurs n’ont pas besoin de savoir qui est actuellement abonné à ce flux. Cela montre comment les utilisateurs de conversation peuvent être complètement dissociés dans le temps et l’espace.

public class ChatUser: Grain
{
    public async Task JoinChat(Guid chatGroupId)
    {
        IStreamProvider streamProvider =
            base.GetStreamProvider("MyStreamProvider");

        IAsyncStream<string> chatStream =
            streamProvider.GetStream<string>(chatGroupId, "MyNamespace");

        await chatStream.SubscribeAsync(
            async (message, token) => Console.WriteLine(message))
    }
}

Exemple de démarrage rapide

L’exemple de démarrage rapide présente une vue d’ensemble du workflow global d’utilisation des flux dans l’application. Après l’avoir lu, vous devez lire API de programmation de flux pour mieux comprendre ces concepts.

API de programmation de flux

La section API de programmation de flux fournit une description détaillée des API de programmation.

Fournisseurs de flux

Les flux peuvent provenir de canaux physiques de différentes formes et peuvent avoir des sémantiques différentes. Le streaming Orleans est conçu pour soutenir cette diversité via le concept de fournisseurs de flux, qui est un point d’extensibilité dans le système. Orleans dispose actuellement d’implémentations de fournisseurs à deux flux : le fournisseur de flux de messages simple basé sur TCP et le fournisseur de flux de file d’attente Azure basé sur la file d’attente Azure. Vous trouverez plus d’informations sur les fournisseurs de flux dans Fournisseurs de flux.

Sémantiques de flux

Sémantiques d’abonnement de flux :

Les flux Orleans garantissent une cohérence séquentielle pour les opérations d’abonnement de flux. Plus précisément, lorsqu’un consommateur s’abonne à un flux, une fois que l’élément Task représentant l’opération d’abonnement a été correctement résolu, le consommateur voit tous les événements qui ont été générés après qu’il s’est abonné. En outre, les flux rembobinables vous permettent de vous abonner à partir d’un point arbitraire dans le temps dans le passé en utilisant StreamSequenceToken. Pour plus d’informations, consultez Fournisseurs de flux Orleans.

Garanties de remise d’événements de flux individuels :

Les garanties de remise d’événements individuelles dépendent des fournisseurs de flux individuels. Certains fournissent uniquement une remise de meilleur effort une fois au maximum (par exemple, les flux de messages simples (SMS)), tandis que d’autres fournissent une remise une fois au minimum (comme les flux de file d’attente Azure). Il est même possible de créer un fournisseur de streaming qui garantira une livraison une fois exactement (nous n’avons pas encore de fournisseur de ce type, mais il est possible d’en créer un).

Ordre de remise d’événements :

L’ordre des événements dépend également d’un fournisseur de flux particulier. Dans les flux SMS, le producteur contrôle explicitement l’ordre des événements vus par le consommateur en contrôlant la façon dont il les publie. Les flux de files d’attente Azure ne garantissent pas l’ordre FIFO, car les files d’attente Azure sous-jacentes ne garantissent pas l’ordre en cas d’échec. Les applications peuvent également contrôler leur ordre de remise de flux en utilisant StreamSequenceToken.

Implémentation des flux

L’implémentation des flux Orleans fournit une vue d’ensemble générale de l’implémentation interne.

Exemples de code

D’autres exemples d’utilisation des API de streaming dans un grain sont disponibles ici. Nous envisageons de créer davantage d’exemples dans le futur.

Voir aussi