Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscribing and processing of ServiceReference's events #331

Open
segabriel opened this issue Sep 8, 2018 · 3 comments
Open

Subscribing and processing of ServiceReference's events #331

segabriel opened this issue Sep 8, 2018 · 3 comments

Comments

@segabriel
Copy link
Member

First of all, what is the ServiceReference? It is a flat structure of each method in each service in the specified endpoint.

I have two different cases in the exchange project.

  1. I want to know when some qualifier will be available in the cluster to invoke (subscribe to) it. And it doesn't matter how many microservices will be available, at least 1.
    Ok. Right now I have two approaches to achieve it:
Microservices ms = ...
GreetingService greetingService = ms.call().create().api(GreetingService.class);

ms.discovery().listen()
  .filter(DiscoveryEvent::isRegistered)
  .flatMap(discoveryEvent -> 
    Flux.fromIterable(discoveryEvent.serviceEndpoint().serviceRegistrations()))
  .filter(serviceRegistration ->
    serviceRegistration.namespace().equals(GreetingService.SERVICE_NAME))
  .take(1)
  .subscribe($ -> greetingService.invoke().subscribe())

and the next one (note it will be work correctly after merging #330):

Microservices ms = ...
GreetingService greetingService = ms.call().create().api(GreetingService.class);

ms.serviceRegistry().listen()
  .filter(RegistryEvent::isAdded)
  .filter(registryEvent -> 
    registryEvent.serviceReference().namespace().equals(GreetingService.SERVICE_NAME))
  .take(1)
  .subscribe($ -> greetingService.invoke().subscribe())

So these are very similar approaches, I think we need to leave only one of them because they look like a duplicate logic (two publishers does the same). And we shouldn't reveal the discovery instance at all, because it has a lot of important methods with side effects like start and shutdown.

  1. I want to subscribe to all events from some services, but such services are a few in the cluster and I want to use our service proxy for it. For now, the proxy uses some router strategy and it selects a ServiceReference itself to invoke it. Imagine we have a few equivalent services which produce some events and your new service should aggregate them into one stream. In a nutshell, you want to subscribe to all events from all services with the same qualifier. How can I achieve it? I know that ServiceReference contains a host and a port inside. So I'd like to do something like that:
private final FluxSink<Event> sink = subject.serialize().sink();
...

Microservices ms = ...

ms.serviceRegistry().listen()
  .filter(RegistryEvent::isAdded)
  .map(RegistryEvent::serviceReference)
  .filter(serviceReference -> 
    serviceReference.namespace().equals(GreetingService.SERVICE_NAME))
  .subscribe(serviceReference ->
     ms.call().create()
        .serviceReference(serviceReference)
        .api(GreetingService.class)
        .invoke()
        .subscribe(FluxSink::next));

Of course, maybe I need to subscribe to RegistryEvent::isRemoved to manage the life cycle of the created proxy. Maybe it will be better if that all (manage life cycle, when it will be available/disconnected, aggregation process and to complete when all will be completed) will be inside the ServiceCall and I just use something like that:

Microservices ms = ...

ms.call().create()
  .failFast(false) 
        // The `Fail-Fast` principle after invoking `subscribe`, for `Flux` and `Mono`.
        // Is it necessary to wait for when this qualifier will be available or not?
  .subscribeToAll(true) // only for `Flux`
  .api(GreetingService.class)
  .invoke()
  .subscribe(event -> ...);

This one can resolve the first problem (failFast).

@ronenhamias
Copy link
Member

ronenhamias commented Sep 9, 2018

i suggest to keep services as they are as they fit better the patterns of request-reply and request stream.

for cases where we want to subscribe and publish and manage the lifecycle of subscription i suggest a pubsub approch where we can describe not a ServiceMethod but a @ServiceTopic(name)

in such case we mark this method with specific behavior that is distinct from service calls.

@ServiceTopic("my-topic-name")
Flux<MyEvent> someTopicEvents();

ServiceCall.api(...).subscribe("my-topic-name")

@ronenhamias
Copy link
Member

ronenhamias commented Sep 9, 2018

Motivation:

lets imagine we have 2 services in our cluster each service might go online and offline at any given point of time.

Service A depends on service B so we want to subscribe to this service when it appears in the cluster and wait for it to stream data to service B

currently this can be done using discovery events from service registry and service discovery.

to reduce the user boiler-plate we can offer an handler to service interface that will delegate events regards service dependencies in the cluster:

Proposed Solution

provide more way for user to react on discovery events by declaring handlers on service interface with regards to the service they want to "trap".

public interface MyService {

   @Discovery(MyService.class)
   void onDiscovery(Flux<DiscoveryEvent> events);

}

public class MyServiceImpl implements MyService, {
  Disposable disposable;
  @Override
  void onDiscovery(Flux<DiscoveryEvent> events){
     events.filter(DiscoveryEvent::isRegistered)
     .map(e -> 
     disposable = event.service().sayHello(someRequest).subscribe()
    ).subscribe()  
  }
}

Alternative approach:

call.discovery(MyService.class).subscribe(e->{
 events.filter(DiscoveryEvent::isRegistered)
     .map(e -> 
     disposable = event.service().sayHello(someRequest).subscribe()
    ).subscribe()  
}).

Scalecube currently gives answer for the current pattern of communication between services

request response
request many
request channel.
after giving it some thought i think scalecube does not answer pubsub communication pattern.
such as fanout, multicast, unicast and topic notion communication patterns between services.

having said that maybe we should consider Publisher / Consumer architecture for cases where we cant to broadcast messages to subscribers:

example scenario might be when we wish to broadcast stock symbols to multiple subscribers ect.

Although it can be done with Rsocket, i think Rsocket offered communication patterns mention above are less intuitive in such cases. as opposed for example to Aeron Consumer / Publishers. in addition current architecture of these communication patterns fits well when you deal with streams, but fits less well when you have to manage multiple subscribers in the cluster and emit one by one to each subscriber in order to create the effect of fanout for example as this will force you to write some boilerplate to manage retry and failures cases.

on the other hand Aeron publisher subscriber offer the notion of broadcasting which comes more intuitive when using UDP transport in these cases the need to manage logical streams on-top of single connections seems less needed thus the value of RSocket is questionable.

Maybe we should consider to offer PubSub layer on-top of Aeron transport with correlation to scalecube cluster to manage coordination of nodes in the cluster with microservices distributed architecture.

in such case we can offer Another transport and client similar to what we currently have with ServiceCall and RSocket patterns.

in case of PubSub patterns we can provide alternative channels for broadcasting messages and managing discovery and failures in the cluster on top of low latency high volume transport such as Aeron.

Please also see:
https://github.com/scalecube/scalecube-pubsub

@segabriel
Copy link
Member Author

Yeah, the idea with @ServiceTopic("my-topic-name") and https://github.com/scalecube/scalecube-pubsub will solve the second case completely.
About the first case, I still think we shouldn't give the discovery instance from the Microservices.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants