Orleans API потоковой передачи
Приложения взаимодействуют с потоками через API, которые очень похожи на известные реактивные расширения (Rx) в .NET. Основное различие заключается в том, что Orleans расширения потоков являются асинхронными, чтобы сделать обработку более эффективной в Orleansраспределенной и масштабируемой вычислительной структуре.
Асинхронный поток
Приложение начинается с использования поставщика потоков для получения дескриптора в поток. Дополнительные сведения о поставщиках потоков можно прочитать здесь, но на данный момент можно рассматривать как фабрику потоков, которая позволяет реализующим настраивать поведение потоков и семантику:
IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
StreamId streamId = StreamId.Create("MyStreamNamespace", Guid);
IAsyncStream<T> stream = streamProvider.GetStream<T>(streamId);
IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
IAsyncStream<T> stream = streamProvider.GetStream<T>(Guid, "MyStreamNamespace");
Приложение может получить ссылку на поставщика потоков либо путем вызова Grain.GetStreamProvider метода, когда находится внутри зерна, либо путем вызова GrainClient.GetStreamProvider метода при использовании клиента.
Orleans.Streams.IAsyncStream<T> — это логический, строго типизированный дескриптор виртуального потока. Это похоже на " Orleans Ссылка на зерно". GetStreamProvider
Вызовы и GetStream
являются исключительно локальными. Аргументы, которые GetStream
должны быть GUID и дополнительной строкой, которую мы называем пространством имен потока (которое может быть null). Вместе GUID и строка пространства имен состоят из удостоверений потока (аналогично аргументам IGrainFactory.GetGrain). Сочетание guid и строки пространства имен обеспечивает дополнительную гибкость при определении удостоверений потока. Точно так же, как зерно 7 может существовать в пределах типа "Зерно", а другое зерно 7 может существовать в пределах типа PlayerGrain
ChatRoomGrain
зерна, поток 123 может существовать с пространством PlayerEventsStream
имен потока, а другой поток 123 может существовать в пространстве ChatRoomMessagesStream
имен потока.
Производство и потребление
IAsyncStream<T> реализует как IAsyncObserver<T> интерфейсы, так и IAsyncObservable<T> интерфейсы. Таким образом, приложение может использовать поток либо для создания новых событий в потоке, либо Orleans.Streams.IAsyncObserver<T>
для подписки на события из потока и их использования с помощью Orleans.Streams.IAsyncObservable<T>
.
public interface IAsyncObserver<in T>
{
Task OnNextAsync(T item, StreamSequenceToken token = null);
Task OnCompletedAsync();
Task OnErrorAsync(Exception ex);
}
public interface IAsyncObservable<T>
{
Task<StreamSubscriptionHandle<T>> SubscribeAsync(IAsyncObserver<T> observer);
}
Чтобы создать события в потоке, приложение просто вызывает
await stream.OnNextAsync<T>(event)
Чтобы подписаться на поток, вызывается приложение
StreamSubscriptionHandle<T> subscriptionHandle = await stream.SubscribeAsync(IAsyncObserver)
Аргумент SubscribeAsync может быть объектом, реализующим IAsyncObserver<T> интерфейс или сочетание лямбда-функций для обработки входящих событий. Дополнительные варианты SubscribeAsync
доступны через AsyncObservableExtensions класс. SubscribeAsync
StreamSubscriptionHandle<T>возвращает непрозрачный дескриптор, который можно использовать для отмены подписки из потока (аналогично асинхронной версииIDisposable).
await subscriptionHandle.UnsubscribeAsync()
Важно отметить, что подписка предназначена для зерна, а не для активации. После подписки на поток код зерна эта подписка превышает жизнь этой активации и остается постоянной до тех пор, пока код зерна (потенциально в другой активации) явно отменяет подписку. Это сердце абстракции виртуального потока: не только всегда существуют все потоки, логически, но и подписка на поток является устойчивой и живет за пределами определенной физической активации, которая создала подписку.
Кратность
Поток Orleans может содержать несколько производителей и нескольких потребителей. Сообщение, опубликованное производителем, будет доставлено всем потребителям, которые были подписаны на поток до публикации сообщения.
Кроме того, потребитель может подписаться на один поток несколько раз. Каждый раз, когда он подписывается, он возвращает уникальный StreamSubscriptionHandle<T>. Если в одном потоке подписана функция X (или клиент), она будет получать одно и то же событие X, однократно для каждой подписки. Потребитель также может отменить подписку из отдельной подписки. Он может найти все текущие подписки, вызвав:
IList<StreamSubscriptionHandle<T>> allMyHandles =
await IAsyncStream<T>.GetAllSubscriptionHandles();
Восстановление после сбоев
Если производитель потока умирает (или его зерно деактивировано), ничего не нужно делать. В следующий раз, когда это зерно хочет создать больше событий, он может снова получить дескриптор потока и создать новые события таким же образом.
Логика потребителя немного более вовлечена. Как уже говорилось ранее, после подписки на поток потребитель эта подписка действительна до явного отмены подписки. Если потребитель потока умирает (или его зерно деактивировано) и создается новое событие в потоке, потребитель будет автоматически повторно активирован (как и при отправке сообщения в него автоматически активируется любое регулярное Orleans зерно). Единственное, что необходимо сделать в коде зерна, заключается в том, чтобы предоставить обработку IAsyncObserver<T> данных. Потребитель должен повторно подключить логику обработки в рамках OnActivateAsync() метода. Для этого он может вызвать следующее:
StreamSubscriptionHandle<int> newHandle =
await subscriptionHandle.ResumeAsync(IAsyncObserver);
Потребитель использует предыдущий дескриптор, который он получил при первой подписке на "возобновление обработки". Обратите внимание, что ResumeAsync просто обновляет существующую подписку с новым экземпляром логики IAsyncObserver
и не изменяет тот факт, что этот потребитель уже подписан на этот поток.
Как потребитель получает старую subscriptionHandle
? Существует 2 варианта. Потребитель, возможно, сохранил дескриптор, который он был возвращен из исходной SubscribeAsync
операции и может использовать его сейчас. Кроме того, если у потребителя нет дескриптора, он может попросить IAsyncStream<T>
все его активные дескриптор подписки, вызвав:
IList<StreamSubscriptionHandle<T>> allMyHandles =
await IAsyncStream<T>.GetAllSubscriptionHandles();
Теперь потребитель может возобновить все из них или отменить подписку от некоторых, если он хочет.
Совет
Если потребитель реализует IAsyncObserver<T> интерфейс напрямую (public class MyGrain<T> : Grain, IAsyncObserver<T>
), он не должен быть обязательным для повторного подключения IAsyncObserver
и, таким образом, не потребуется вызывать ResumeAsync
. Среда выполнения потоковой передачи должна автоматически определить, что зерно уже реализует IAsyncObserver
и будет вызывать эти IAsyncObserver
методы. Однако среда выполнения потоковой передачи в настоящее время не поддерживает этот код, и код зерна по-прежнему должен явно вызываться ResumeAsync
, даже если зерно реализует IAsyncObserver
напрямую.
Явные и неявные подписки
По умолчанию потребитель потока должен явно подписаться на поток. Обычно эта подписка активируется каким-либо внешним сообщением, которое получает зерно (или клиент), которое указывает ему подписаться. Например, в службе чата, когда пользователь присоединяется JoinChatGroup
к комнате чата, его зерно получает сообщение с именем чата, что приведет к тому, что пользователь будет подписываться на этот поток чата.
Кроме того, Orleans потоки также поддерживают неявные подписки. В этой модели зерно явно не подписывается на поток. Это зерно подписывается автоматически, неявно, только на основе его удостоверений зерна и ImplicitStreamSubscriptionAttribute. Основное значение неявных подписок позволяет потоковым действиям активировать активацию зерна (поэтому активация подписки) автоматически. Например, при использовании SMS-потоков, если одно зерно хотело создать поток и другой процесс зерна этого потока, производитель должен был бы знать личность потребительского зерна и сделать ему вызов зерна, чтобы он говорил ему подписаться на поток. Только после этого он может начать отправку событий. Вместо этого, используя неявные подписки, производитель может просто начать производить события в потоке, и потребитель зерна автоматически активируется и подписывается на поток. В этом случае продюсер не заботится вообще, кто читает события
Реализация зерна MyGrainType
может объявлять атрибут [ImplicitStreamSubscription("MyStreamNamespace")]
. Это сообщает среде выполнения потоковой передачи, что при создании события в потоке, удостоверение которого — GUID XXX и "MyStreamNamespace"
пространство имен, оно должно быть доставлено в зерно, удостоверение которого имеет тип MyGrainType
XXX. То есть среда выполнения сопоставляет поток <XXX, MyStreamNamespace>
с потребительским зерном <XXX, MyGrainType>
.
Наличие ImplicitStreamSubscription
среды выполнения потоковой передачи автоматически подписывает это зерно в поток и передает события потока. Однако код зерна по-прежнему должен сообщить среде выполнения, как он хочет обрабатывать события. По сути, оно должно быть присоединено IAsyncObserver
. Поэтому при активации зерна код зерна внутри OnActivateAsync
должен вызываться:
IStreamProvider streamProvider =
base.GetStreamProvider("SimpleStreamProvider");
StreamId streamId =
StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
IAsyncStream<T> stream =
streamProvider.GetStream<T>(streamId);
StreamSubscriptionHandle<T> subscription =
await stream.SubscribeAsync(IAsyncObserver<T>);
IStreamProvider streamProvider =
base.GetStreamProvider("SimpleStreamProvider");
IAsyncStream<T> stream =
streamProvider.GetStream<T>(this.GetPrimaryKey(), "MyStreamNamespace");
StreamSubscriptionHandle<T> subscription =
await stream.SubscribeAsync(IAsyncObserver<T>);
Написание логики подписки
Ниже приведены рекомендации по написанию логики подписки для различных случаев: явные и неявные подписки, перемоткаемые и неотменяемые потоки. Основное различие между явными и неявными подписками заключается в том, что для неявного зерна всегда есть ровно одна неявная подписка для каждого пространства имен потока; Нет способа создать несколько подписок (нет кратности подписки), нет способа отмены подписки, а логика зерна всегда должна быть присоединена только к логике обработки. Это также означает, что для неявных подписок никогда не требуется возобновить подписку. С другой стороны, для явных подписок необходимо возобновить подписку, в противном случае, если зерно подписывается снова, это приведет к тому, что зерно подписывается несколько раз.
Неявные подписки:
Для неявных подписок зерна по-прежнему необходимо подписаться на присоединение логики обработки. Это можно сделать в зерне потребителя, реализуя IStreamSubscriptionObserver
и IAsyncObserver<T>
интерфейсы, позволяя зернам активировать отдельно от подписки. Чтобы подписаться на поток, зерно создает дескриптор и вызовы await handle.ResumeAsync(this)
в методе OnSubscribed(...)
.
Для обработки сообщений IAsyncObserver<T>.OnNextAsync(...)
метод реализуется для получения потоковой передачи данных и маркера последовательности. Кроме того, метод может принимать набор делегатов, ResumeAsync
представляющих методы IAsyncObserver<T>
интерфейса, onNextAsync
onErrorAsync
и onCompletedAsync
.
public Task OnNextAsync(string item, StreamSequenceToken? token = null)
{
_logger.LogInformation($"Received an item from the stream: {item}");
}
public async Task OnSubscribed(IStreamSubscriptionHandleFactory handleFactory)
{
var handle = handleFactory.Create<string>();
await handle.ResumeAsync(this);
}
public override async Task OnActivateAsync()
{
var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
var stream =
streamProvider.GetStream<string>(
this.GetPrimaryKey(), "MyStreamNamespace");
await stream.SubscribeAsync(OnNextAsync);
}
Явные подписки:
Для явных подписок зерна должно вызывать SubscribeAsync
подписку на поток. При этом создается подписка, а также подключается логика обработки. Явная подписка будет существовать до отмены подписки, поэтому если зерно будет деактивировано и повторно активировано, зерно по-прежнему явно подписано, но логика обработки не будет присоединена. В этом случае зерна необходимо повторно подключить логику обработки. Чтобы сделать это, в своем OnActivateAsync
зерне сначала необходимо выяснить, какие подписки у него есть, вызвав IAsyncStream<T>.GetAllSubscriptionHandles(). Зерно должно выполняться ResumeAsync
для каждого дескриптора, с которым он хочет продолжить обработку или отменить подпискуAsync на любых дескрипторах, с которыми он выполняется. При необходимости вы можете указать StreamSequenceToken
аргумент для ResumeAsync
вызовов, что приведет к тому, что эта явная подписка начнет использовать этот маркер.
public async override Task OnActivateAsync(CancellationToken cancellationToken)
{
var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
var streamId = StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
var stream = streamProvider.GetStream<string>(streamId);
var subscriptionHandles = await stream.GetAllSubscriptionHandles();
foreach (var handle in subscriptionHandles)
{
await handle.ResumeAsync(this);
}
}
public async override Task OnActivateAsync()
{
var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
var stream =
streamProvider.GetStream<string>(this.GetPrimaryKey(), "MyStreamNamespace");
var subscriptionHandles = await stream.GetAllSubscriptionHandles();
if (!subscriptionHandles.IsNullOrEmpty())
{
subscriptionHandles.ForEach(
async x => await x.ResumeAsync(OnNextAsync));
}
}
Потоковые маркеры порядка и последовательности
Порядок доставки событий между отдельным производителем и отдельным потребителем зависит от поставщика потоков.
С помощью SMS производитель явно управляет порядком событий, замеченных потребителем, управляя способом публикации продюсером. По умолчанию (если для поставщика SMS задано значение false), и если SimpleMessageStreamProviderOptions.FireAndForgetDelivery производитель ожидает каждого OnNextAsync
вызова, события поступают в заказ FIFO. В SMS это до производителя, чтобы решить, как обрабатывать сбои доставки, которые будут указываться сбоем Task
, возвращенным вызовом OnNextAsync
.
Потоки очередей Azure не гарантируют порядок FIFO, так как базовые очереди Azure не гарантируют порядок в случаях сбоя. (Они гарантируют порядок FIFO в выполнении без сбоев.) Когда производитель создает событие в очередь Azure, если операция очереди завершается сбоем, производитель пытается выполнить попытку другой очереди и более поздних версий с потенциальными повторяющимися сообщениями. На стороне Orleans доставки среда выполнения потоковой передачи отменяет событие из очереди и пытается доставить его для обработки потребителям. Среда Orleans выполнения потоковой передачи удаляет событие из очереди только после успешной обработки. Если доставка или обработка завершается ошибкой, событие не удаляется из очереди и автоматически будет автоматически отображаться в очереди позже. Среда выполнения потоковой передачи попытается снова доставить его, что потенциально нарушает порядок FIFO. Приведенное выше поведение соответствует нормальной семантике очередей Azure.
Определяемый приложением порядок. Чтобы справиться с указанными выше проблемами упорядочивания, приложение может дополнительно указать его порядок. Это достигается с помощью StreamSequenceTokenнепрозрачного IComparable объекта, который можно использовать для упорядочивания событий. Продюсер может передать необязательный StreamSequenceToken
вызов OnNext
. Это StreamSequenceToken
будет передано потребителю и будет доставлено вместе с событием. Таким образом, приложение может причинить и восстановить порядок независимо от среды выполнения потоковой передачи.
Перемыкаемые потоки
Некоторые потоки позволяют приложению подписываться только на них, начиная с последней точки во времени, а другие потоки позволяют "вернуться в время". Последняя возможность зависит от базовой технологии очередей и конкретного поставщика потоков. Например, очереди Azure позволяют использовать только последние события, в то время как EventHub позволяет повторять события из произвольной точки во времени (до некоторого времени окончания срока действия). Потоки, поддерживающие возвращение во времени, называются перемыкаемыми потоками.
Потребитель перемотываемого потока может передать StreamSequenceToken
SubscribeAsync
вызов. Среда выполнения будет доставлять события в него начиная с этого StreamSequenceToken
. Маркер NULL означает, что потребитель хочет получать события начиная с последней версии.
Возможность перемотки потока очень полезна в сценариях восстановления. Например, рассмотрим зерно, подписывающееся на поток, и периодически контрольные точки его состояния вместе с последним маркером последовательности. При восстановлении после сбоя зерно может повторно подписаться на тот же поток из последнего маркера последовательности контрольных точек, тем самым не теряя никаких событий, созданных с момента последней контрольной точки.
Поставщик Центров событий перемотывается. Его код можно найти на сайте GitHub: Orleans/Azure/Orleans. Streaming.EventHubs. Поставщики sms и очередей Azure не перемотываются.
Автоматическое горизонтальное масштабирование без отслеживания состояния
По умолчанию Orleans потоковая передача предназначена для поддержки большого количества относительно небольших потоков, каждый из которых обрабатывается одним или несколькими зернами с отслеживанием состояния. В совокупности обработка всех потоков вместе сегментирована среди большого количества регулярных (с отслеживанием состояния) зерна. Код приложения управляет этим сегментированием путем назначения идентификаторов потока и идентификаторов зерна и явно подписыванием. Цель — сегментированная обработка с отслеживанием состояния.
Однако существует также интересный сценарий автоматической горизонтальной обработки без отслеживания состояния. В этом сценарии приложение имеет небольшое количество потоков (или даже одного большого потока), а цель — без отслеживания состояния. Примером является глобальный поток событий, в котором обработка включает декодирование каждого события и потенциально переадресацию его в другие потоки для дальнейшей обработки состояния. Обработка потока без отслеживания состояния без отслеживания состояния может поддерживаться с Orleans помощью StatelessWorkerAttribute зерна.
Текущее состояние автоматической обработки без отслеживания состояния без отслеживания состояния. Это еще не реализовано. Попытка подписаться на поток из StatelessWorker
зерна приведет к неопределенному поведению. Мы рассмотрим этот вариант.
Зерна и Orleans клиенты
Orleans потоки работают равномерно между зернами и Orleans клиентами. То есть те же API можно использовать внутри зерна и в клиенте Orleans для создания и использования событий. Это значительно упрощает логику приложения, делая специальные клиентские API, такие как наблюдатели зерна, избыточные.
Полностью управляемая и надежная потоковая передача pub-sub
Для отслеживания подписок Orleans потоков использует компонент среды выполнения с именем Streaming Pub-Sub , который служит в качестве точки отступа для потребителей потоков и производителей потоков. Pub-sub отслеживает все подписки потоков и сохраняет их и сопоставляет потребителей потоков с производителями потоков.
Приложения могут выбирать, где и как хранятся данные Pub-Sub. Сам компонент Pub-Sub реализуется как зерна (называемый PubSubRendezvousGrain
), который использует Orleans декларативное сохраняемость. PubSubRendezvousGrain
использует поставщик хранилища с именем PubSubStore
. Как и в случае с любым зерном, можно назначить реализацию для поставщика хранилища. Для потоковой передачи PubSubStore
Pub-Sub можно изменить реализацию во время строительства silo с помощью построителя узлов silo:
Ниже описано, как настроить pub-Sub для хранения состояния в таблицах Azure.
hostBuilder.AddAzureTableGrainStorage("PubSubStore",
options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddAzureTableGrainStorage("PubSubStore",
options => options.ConnectionString = "<Secret>");
Таким образом данные Pub-Sub будут надежно храниться в таблице Azure. Для первоначальной разработки также можно использовать хранилище памяти. Помимо Pub-Sub Orleans среда выполнения потоковой передачи предоставляет события от производителей потребителям, управляет всеми ресурсами среды выполнения, выделенными для активно используемых потоков, и прозрачно мусор собирает ресурсы среды выполнения из неиспользуемых потоков.
Настройка
Чтобы использовать потоки, необходимо включить поставщиков потоков через узел silo или построитель клиентов кластера. Дополнительные сведения о поставщиках потоков см. здесь. Пример настройки поставщика потоков:
hostBuilder.AddMemoryStreams("StreamProvider")
.AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
optionsBuilder => optionsBuilder.Configure(
options => options.ConfigureTableServiceClient("<Secret>")))
.AddAzureTableGrainStorage("PubSubStore",
options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
.AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
optionsBuilder => optionsBuilder.Configure(
options => options.ConnectionString = "<Secret>"))
.AddAzureTableGrainStorage("PubSubStore",
options => options.ConnectionString = "<Secret>");