Sdílet prostřednictvím


Jak používat rozhraní API pro komunikaci Reliable Services

Azure Service Fabric jako platforma je zcela nezávislá na komunikaci mezi službami. Všechny protokoly a zásobníky jsou přijatelné od UDP do HTTP. Je na vývojáři služeb, aby zvolil, jak mají služby komunikovat. Aplikační architektura Reliable Services poskytuje integrované komunikační zásobníky a také rozhraní API, která můžete použít k sestavení vlastních komunikačních komponent.

Nastavení komunikace služby

Rozhraní API Reliable Services používá pro komunikaci služeb jednoduché rozhraní. Pokud chcete otevřít koncový bod pro vaši službu, jednoduše implementujte toto rozhraní:


public interface ICommunicationListener
{
    Task<string> OpenAsync(CancellationToken cancellationToken);

    Task CloseAsync(CancellationToken cancellationToken);

    void Abort();
}

public interface CommunicationListener {
    CompletableFuture<String> openAsync(CancellationToken cancellationToken);

    CompletableFuture<?> closeAsync(CancellationToken cancellationToken);

    void abort();
}

Implementaci naslouchacího procesu komunikace pak můžete přidat tak, že ji vrátíte v přepsání metody třídy založené na službě.

Bezstavové služby:

public class MyStatelessService : StatelessService
{
    protected override IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners()
    {
        ...
    }
    ...
}
public class MyStatelessService extends StatelessService {

    @Override
    protected List<ServiceInstanceListener> createServiceInstanceListeners() {
        ...
    }
    ...
}

Stavové služby:

    @Override
    protected List<ServiceReplicaListener> createServiceReplicaListeners() {
        ...
    }
    ...
public class MyStatefulService : StatefulService
{
    protected override IEnumerable<ServiceReplicaListener> CreateServiceReplicaListeners()
    {
        ...
    }
    ...
}

V obou případech vrátíte kolekci naslouchacích procesů. Použití více naslouchacích procesů umožňuje službě naslouchat na více koncových bodech, potenciálně pomocí různých protokolů. Můžete mít například naslouchací proces HTTP a samostatný naslouchací proces WebSocket. Můžete migrovat z nezabezpečeného na zabezpečení vzdálené komunikace tím, že nejprve povolíte oba scénáře tím, že budete mít nezabezpečený naslouchací proces i zabezpečený naslouchací proces. Každý naslouchací proces získá název a výsledná kolekce názvu: dvojice adres jsou reprezentovány jako objekt JSON, když klient požaduje adresy naslouchání pro instanci služby nebo oddíl.

V bezstavové službě vrátí přepsání kolekci ServiceInstanceListeners. A ServiceInstanceListener obsahuje funkci, která ICommunicationListener(C#) / CommunicationListener(Java) vytvoří a pojmenuje ji. Pro stavové služby vrátí přepsání kolekci ServiceReplicaListeners. To se mírně liší od bezstavového protějšku, protože ServiceReplicaListener má možnost otevřít na ICommunicationListener sekundárních replikách. Ve službě můžete používat nejen několik naslouchacích procesů komunikace, ale můžete také určit, které naslouchací procesy přijímají požadavky na sekundární repliky a které naslouchají jenom na primárních replikách.

Můžete mít například ServiceRemotingListener, který přijímá volání RPC pouze na primárních replikách, a druhý vlastní naslouchací proces, který přijímá požadavky na čtení na sekundárních replikách přes HTTP:

protected override IEnumerable<ServiceReplicaListener> CreateServiceReplicaListeners()
{
    return new[]
    {
        new ServiceReplicaListener(context =>
            new MyCustomHttpListener(context),
            "HTTPReadonlyEndpoint",
            true),

        new ServiceReplicaListener(context =>
            this.CreateServiceRemotingListener(context),
            "rpcPrimaryEndpoint",
            false)
    };
}

Poznámka:

Při vytváření více naslouchacích procesů pro službu musí mít každý naslouchací proces jedinečný název.

Nakonec popište koncové body potřebné pro službu v manifestu služby v části o koncových bodech.

<Resources>
    <Endpoints>
      <Endpoint Name="WebServiceEndpoint" Protocol="http" Port="80" />
      <Endpoint Name="OtherServiceEndpoint" Protocol="tcp" Port="8505" />
    <Endpoints>
</Resources>

Naslouchací proces komunikace má přístup k prostředkům koncového bodu, které mu byly přiděleny, z objektu v objektu CodePackageActivationContext ServiceContext. Naslouchací proces pak může při otevření začít naslouchat žádostem.

var codePackageActivationContext = serviceContext.CodePackageActivationContext;
var port = codePackageActivationContext.GetEndpoint("ServiceEndpoint").Port;

CodePackageActivationContext codePackageActivationContext = serviceContext.getCodePackageActivationContext();
int port = codePackageActivationContext.getEndpoint("ServiceEndpoint").getPort();

Poznámka:

Prostředky koncových bodů jsou společné pro celý balíček služby a služba Service Fabric je při aktivaci balíčku služby přiděluje. Stejný port může sdílet několik replik služeb hostovaných ve stejném hostiteli služby. To znamená, že naslouchací proces komunikace by měl podporovat sdílení portů. Doporučeným způsobem je, aby naslouchací proces komunikace používal ID oddílu a ID repliky nebo instance při generování adresy naslouchání.

Registrace adresy služby

Systémová služba označovaná jako Služba pojmenování běží v clusterech Service Fabric. Služba pojmenování je registrátorem služeb a jejich adresy, na nichž každá instance nebo replika služby naslouchá. OpenAsync(C#) / openAsync(Java) Po dokončení metody ICommunicationListener(C#) / CommunicationListener(Java) se jeho návratová hodnota zaregistruje ve službě pojmenování. Tato návratová hodnota, která se publikuje ve službě pojmenování, je řetězec, jehož hodnota může být vůbec cokoli. Tato řetězcová hodnota je to, co klienti vidí, když chtějí zadat adresu služby ze služby pojmenování.

public Task<string> OpenAsync(CancellationToken cancellationToken)
{
    EndpointResourceDescription serviceEndpoint = serviceContext.CodePackageActivationContext.GetEndpoint("ServiceEndpoint");
    int port = serviceEndpoint.Port;

    this.listeningAddress = string.Format(
                CultureInfo.InvariantCulture,
                "http://+:{0}/",
                port);

    this.publishAddress = this.listeningAddress.Replace("+", FabricRuntime.GetNodeContext().IPAddressOrFQDN);

    this.webApp = WebApp.Start(this.listeningAddress, appBuilder => this.startup.Invoke(appBuilder));

    // the string returned here will be published in the Naming Service.
    return Task.FromResult(this.publishAddress);
}
public CompletableFuture<String> openAsync(CancellationToken cancellationToken)
{
    EndpointResourceDescription serviceEndpoint = serviceContext.getCodePackageActivationContext.getEndpoint("ServiceEndpoint");
    int port = serviceEndpoint.getPort();

    this.publishAddress = String.format("http://%s:%d/", FabricRuntime.getNodeContext().getIpAddressOrFQDN(), port);

    this.webApp = new WebApp(port);
    this.webApp.start();

    /* the string returned here will be published in the Naming Service.
     */
    return CompletableFuture.completedFuture(this.publishAddress);
}

Service Fabric poskytuje rozhraní API, která klientům a dalším službám umožňují zadat tuto adresu podle názvu služby. To je důležité, protože adresa služby není statická. Služby se přesunou v clusteru pro účely vyrovnávání prostředků a dostupnosti. Jedná se o mechanismus, který klientům umožňuje přeložit adresu naslouchání pro službu.

Poznámka:

Kompletní návod, jak napsat naslouchací proces komunikace, najdete v tématu Služby Service Fabric Web API se samoobslužným hostováním OWIN pro C#, zatímco pro Javu můžete napsat vlastní implementaci serveru HTTP, viz příklad aplikace EchoServer na adrese https://github.com/Azure-Samples/service-fabric-java-getting-started.

Komunikace se službou

Rozhraní API Reliable Services poskytuje následující knihovny pro zápis klientů, kteří komunikují se službami.

Překlad koncového bodu služby

Prvním krokem ke komunikaci se službou je vyřešení adresy koncového bodu oddílu nebo instance služby, se kterou chcete komunikovat. Třída ServicePartitionResolver(C#) / FabricServicePartitionResolver(Java) utility je základní primitivní, která pomáhá klientům určit koncový bod služby za běhu. V terminologii Service Fabric se proces určení koncového bodu služby označuje jako překlad koncového bodu služby.

Pro připojení ke službám v rámci clusteru je možné vytvořit ServicePartitionResolver pomocí výchozího nastavení. Toto je doporučené využití pro většinu situací:

ServicePartitionResolver resolver = ServicePartitionResolver.GetDefault();
FabricServicePartitionResolver resolver = FabricServicePartitionResolver.getDefault();

Pokud se chcete připojit ke službám v jiném clusteru, je možné vytvořit ServicePartitionResolver se sadou koncových bodů brány clusteru. Všimněte si, že koncové body brány jsou jenom různé koncové body pro připojení ke stejnému clusteru. Příklad:

ServicePartitionResolver resolver = new  ServicePartitionResolver("mycluster.cloudapp.azure.com:19000", "mycluster.cloudapp.azure.com:19001");
FabricServicePartitionResolver resolver = new  FabricServicePartitionResolver("mycluster.cloudapp.azure.com:19000", "mycluster.cloudapp.azure.com:19001");

Alternativně ServicePartitionResolver můžete dát funkci pro vytvoření interního FabricClient použití:

public delegate FabricClient CreateFabricClientDelegate();
public FabricServicePartitionResolver(CreateFabricClient createFabricClient) {
...
}

public interface CreateFabricClient {
    public FabricClient getFabricClient();
}

FabricClient je objekt, který slouží ke komunikaci s clusterem Service Fabric pro různé operace správy v clusteru. To je užitečné, když chcete mít větší kontrolu nad tím, jak překladač oddílů služby komunikuje s vaším clusterem. FabricClient provádí ukládání do mezipaměti interně a je obecně nákladné k vytvoření, takže je důležité opakovaně používat FabricClient instance co nejvíce.

ServicePartitionResolver resolver = new  ServicePartitionResolver(() => CreateMyFabricClient());
FabricServicePartitionResolver resolver = new  FabricServicePartitionResolver(() -> new CreateFabricClientImpl());

Metoda překladu se pak použije k načtení adresy služby nebo oddílu služby pro dělené služby.

ServicePartitionResolver resolver = ServicePartitionResolver.GetDefault();

ResolvedServicePartition partition =
    await resolver.ResolveAsync(new Uri("fabric:/MyApp/MyService"), new ServicePartitionKey(), cancellationToken);
FabricServicePartitionResolver resolver = FabricServicePartitionResolver.getDefault();

CompletableFuture<ResolvedServicePartition> partition =
    resolver.resolveAsync(new URI("fabric:/MyApp/MyService"), new ServicePartitionKey());

Adresu služby je možné snadno vyřešit pomocí ServicePartitionResolver, ale k zajištění správného použití vyřešené adresy je potřeba více práce. Váš klient musí zjistit, jestli pokus o připojení selhal z důvodu přechodné chyby a může se opakovat (např. přesun služby nebo je dočasně nedostupný) nebo trvalou chybu (např. služba byla odstraněna nebo požadovaný prostředek již neexistuje). Instance služby nebo repliky se můžou kdykoli přesouvat z uzlu na uzel z několika důvodů. Adresa služby vyřešená prostřednictvím ServicePartitionResolver může být zastaralá v době, kdy se váš kód klienta pokusí připojit. V takovém případě klient musí adresu přeložit znovu. Pokud zadáte předchozí ResolvedServicePartition informace, znamená to, že překladač se musí zkusit znovu, a ne jednoduše načíst adresu uloženou v mezipaměti.

Kód klienta obvykle nemusí pracovat s ServicePartitionResolver přímo. Vytvoří se a předá do komunikačních klientských továren v rozhraní API Reliable Services. Továrny používají překladač interně ke generování klientského objektu, který lze použít ke komunikaci se službami.

Komunikační klienti a továrny

Knihovna komunikační továrny implementuje typický vzor opakování zpracování chyb, který usnadňuje opakování připojení k vyřešeným koncovým bodům služby. Knihovna továrny poskytuje mechanismus opakování, když zadáte obslužné rutiny chyb.

ICommunicationClientFactory(C#) / CommunicationClientFactory(Java) definuje základní rozhraní implementované komunikační klientskou továrnou, která vytváří klienty, kteří můžou komunikovat se službou Service Fabric. Implementace CommunicationClientFactory závisí na komunikačním zásobníku používaném službou Service Fabric, kde chce klient komunikovat. Rozhraní API Reliable Services poskytuje CommunicationClientFactoryBase<TCommunicationClient>rozhraní API . Poskytuje základní implementaci rozhraní CommunicationClientFactory a provádí úlohy, které jsou společné pro všechny komunikační zásobníky. (Mezi tyto úlohy patří použití ServicePartitionResolver k určení koncového bodu služby). Klienti obvykle implementují abstraktní třídu CommunicationClientFactoryBase pro zpracování logiky, která je specifická pro komunikační zásobník.

Komunikační klient obdrží adresu a použije ji k připojení ke službě. Klient může použít jakýkoli protokol, který chce.

public class MyCommunicationClient : ICommunicationClient
{
    public ResolvedServiceEndpoint Endpoint { get; set; }

    public string ListenerName { get; set; }

    public ResolvedServicePartition ResolvedServicePartition { get; set; }
}
public class MyCommunicationClient implements CommunicationClient {

    private ResolvedServicePartition resolvedServicePartition;
    private String listenerName;
    private ResolvedServiceEndpoint endPoint;

    /*
     * Getters and Setters
     */
}

Klientská továrna je primárně zodpovědná za vytváření komunikačních klientů. Pro klienty, kteří neudržuje trvalé připojení, jako je klient HTTP, musí továrna pouze vytvořit a vrátit klienta. Další protokoly, které udržují trvalé připojení, jako jsou některé binární protokoly, by také měly být ověřeny (ValidateClient(string endpoint, MyCommunicationClient client)) továrnou, aby bylo možné určit, jestli se připojení musí znovu vytvořit.

public class MyCommunicationClientFactory : CommunicationClientFactoryBase<MyCommunicationClient>
{
    protected override void AbortClient(MyCommunicationClient client)
    {
    }

    protected override Task<MyCommunicationClient> CreateClientAsync(string endpoint, CancellationToken cancellationToken)
    {
    }

    protected override bool ValidateClient(MyCommunicationClient clientChannel)
    {
    }

    protected override bool ValidateClient(string endpoint, MyCommunicationClient client)
    {
    }
}
public class MyCommunicationClientFactory extends CommunicationClientFactoryBase<MyCommunicationClient> {

    @Override
    protected boolean validateClient(MyCommunicationClient clientChannel) {
    }

    @Override
    protected boolean validateClient(String endpoint, MyCommunicationClient client) {
    }

    @Override
    protected CompletableFuture<MyCommunicationClient> createClientAsync(String endpoint) {
    }

    @Override
    protected void abortClient(MyCommunicationClient client) {
    }
}

Nakonec je obslužná rutina výjimky zodpovědná za určení akce, která se má provést, když dojde k výjimce. Výjimky jsou kategorizovány do opakovatelných a neopakovatelných.

  • Neopakovatelné výjimky jednoduše vrátí zpět volajícímu.
  • Opakovatelné výjimky jsou dále kategorizovány do přechodných a non-přechodných.
    • Přechodné výjimky jsou ty, které je možné jednoduše opakovat bez opětovného překladu adresy koncového bodu služby. Ty budou zahrnovat přechodné problémy se sítí nebo odpovědi na chyby služby jiné než ty, které označují, že adresa koncového bodu služby neexistuje.
    • Ne přechodné výjimky jsou výjimky, které vyžadují opětovné vyřešení adresy koncového bodu služby. Patří sem výjimky, které značí, že koncový bod služby nebylo možné dosáhnout, což znamená, že se služba přesunula do jiného uzlu.

Rozhodnutí TryHandleException o dané výjimce. Pokud neví, jaká rozhodnutí o výjimce udělat, měla by vrátit hodnotu false. Pokud ví, jaké rozhodnutí provést, měl by výsledek odpovídajícím způsobem nastavit a vrátit hodnotu true.

class MyExceptionHandler : IExceptionHandler
{
    public bool TryHandleException(ExceptionInformation exceptionInformation, OperationRetrySettings retrySettings, out ExceptionHandlingResult result)
    {
        // if exceptionInformation.Exception is known and is transient (can be retried without re-resolving)
        result = new ExceptionHandlingRetryResult(exceptionInformation.Exception, true, retrySettings, retrySettings.DefaultMaxRetryCount);
        return true;


        // if exceptionInformation.Exception is known and is not transient (indicates a new service endpoint address must be resolved)
        result = new ExceptionHandlingRetryResult(exceptionInformation.Exception, false, retrySettings, retrySettings.DefaultMaxRetryCount);
        return true;

        // if exceptionInformation.Exception is unknown (let the next IExceptionHandler attempt to handle it)
        result = null;
        return false;
    }
}
public class MyExceptionHandler implements ExceptionHandler {

    @Override
    public ExceptionHandlingResult handleException(ExceptionInformation exceptionInformation, OperationRetrySettings retrySettings) {

        /* if exceptionInformation.getException() is known and is transient (can be retried without re-resolving)
         */
        result = new ExceptionHandlingRetryResult(exceptionInformation.getException(), true, retrySettings, retrySettings.getDefaultMaxRetryCount());
        return true;


        /* if exceptionInformation.getException() is known and is not transient (indicates a new service endpoint address must be resolved)
         */
        result = new ExceptionHandlingRetryResult(exceptionInformation.getException(), false, retrySettings, retrySettings.getDefaultMaxRetryCount());
        return true;

        /* if exceptionInformation.getException() is unknown (let the next ExceptionHandler attempt to handle it)
         */
        result = null;
        return false;

    }
}

Spojení všech součástí dohromady

ICommunicationClient(C#) / CommunicationClient(Java)S , ICommunicationClientFactory(C#) / CommunicationClientFactory(Java)a IExceptionHandler(C#) / ExceptionHandler(Java) postaveny kolem komunikačního protokolu, ServicePartitionClient(C#) / FabricServicePartitionClient(Java) obtéká to vše dohromady a poskytuje smyčku řešení chyb a oddíly služby řešení kolem těchto komponent.

private MyCommunicationClientFactory myCommunicationClientFactory;
private Uri myServiceUri;

var myServicePartitionClient = new ServicePartitionClient<MyCommunicationClient>(
    this.myCommunicationClientFactory,
    this.myServiceUri,
    myPartitionKey);

var result = await myServicePartitionClient.InvokeWithRetryAsync(async (client) =>
   {
      // Communicate with the service using the client.
   },
   CancellationToken.None);

private MyCommunicationClientFactory myCommunicationClientFactory;
private URI myServiceUri;

FabricServicePartitionClient myServicePartitionClient = new FabricServicePartitionClient<MyCommunicationClient>(
    this.myCommunicationClientFactory,
    this.myServiceUri,
    myPartitionKey);

CompletableFuture<?> result = myServicePartitionClient.invokeWithRetryAsync(client -> {
      /* Communicate with the service using the client.
       */
   });

Další kroky