Skip to content

Connecting to WebSocket

Rishabh Joshi edited this page Oct 15, 2018 · 4 revisions

Upstox's WebSocket channel provides various types of push updates in near real time. Riko provides you with an API to connect to the WebSocket to receive those push updates.

1. Types of Messages

When you connect to the WebSocket you would receive messages that would be one of the following type:

  1. TextMessage - A standard text message. Order & Trade Updates from Upstox are of this type.
  2. BinaryMessage - Message in binary form. Needs to be converted to String for further processing. Quote Updates from Upstox are of this type.
  3. ConnectedMessage - Indicates the application is now connected to Upstox WebSocket.
  4. DisconnectedMessage - Indicates the application is now disconnected from Upstox WebSocket. Disconnect Events from Upstox are of this type.
  5. ClosingMessage - Indicates the application is now disconnecting from Upstox WebSocket.
  6. ErrorMessage - Indicates that an error has occurred with the WebSocket connection. Error events from Upstox are of this type.

2. Subscribing to Messages

To receive the above messages, you need to subscribe to them, by providing the subscriber during connection.

Based on your preference, you can have a single subscriber that processes all the types of messages, or may have individual subscribers for each type.

You need to implement the MessageSubscriber interface and provide its implementation to the WebSocketService::connect() method. The internal web socket implementation of Riko (MessageListener) here uses the Java 9 Flow API to publish the incoming messages to the interested subscribers (MessageSubscriber).

An example of a single MessageSubscriber, processing all types of messages:

public class UpstoxWebSocketSubscriber implements MessageSubscriber {
    private static final Logger log = LogManager.getLogger(UpstoxWebSocketSubscriber.class);
    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(final Flow.Subscription subscription) {
        log.info("Subscribed! Ready to receive messages!");
        this.subscription = subscription;
        this.subscription.request(1);
    }

    @Override
    public void onNext(final WebSocketMessage item) {
        if (item instanceof BinaryMessage) {
            log.info("Binary Message: {}", ((BinaryMessage) item).getMessageAsString());
        } else if (item instanceof ConnectedMessage) {
            final ConnectedMessage message = (ConnectedMessage) item;
            log.info("Connected to Upstox: {}", message.getMessage());
        } else if (item instanceof DisconnectedMessage) {
            final DisconnectedMessage message = (DisconnectedMessage) item;
            log.info("Disconnected from Upstox:  Code: {}, Reason: {}", message.getCode(), message.getReason());
        } else if (item instanceof ClosingMessage) {
            final ClosingMessage message = (ClosingMessage) item;
            log.warn("Closing the web-socket connection:  Code: {}, Reason: {}", message.getCode(), message.getReason());
        } else if (item instanceof ErrorMessage) {
            final ErrorMessage message = (ErrorMessage) item;
            // Reusing the 'onError()'
            onError(new MyCustomException("Error from Upstox: " + message.getReason(), message.getThrowable()));
        } else {
            // if (item instanceof TextMessage) {
            final TextMessage message = (TextMessage) item;
            log.info("Text message received: {}", message);
        }
        // Ask for the next message (do not miss this line)
        this.subscription.request(1);
    }

    @Override
    public void onError(final Throwable throwable) {
        log.fatal("Error occurred: {}", throwable);
    }

    @Override
    public void onComplete() {
        log.info("Subscription is now complete - no more messages from Upstox.");
    }

    @Override
    public String getName() {
        return "MySubscriber"; // Provide a unique name
    }
}

3. Connecting to the WebSocket

Now that we have the subscriber(s) ready, we can easily connect to the WebSocket as below:

WebSocketService service = new WebSocketService(accessToken, apiCredentials);
WrappedWebSocket socket = service.connect(subscribers);

Note: The WebSocketService::connect() takes a List<MessageSubscriber>.

Note: Before connecting to Upstox's WebSocket, Riko automatically retrieves the best connection parameters required to provide a stable WebSocket connection with Upstox and uses them while to make the connection.

Sample of Multiple subscribers

1. Declare your various subscribers

Subscriber 1
    public class TextMessageSubscriber implements MessageSubscriber {...}

Subscriber 2

    public class BinaryMessageSubscriber implements MessageSubscriber {...}

Subscriber 3

    public class ConnectionStatusSubscriber implements MessageSubscriber {...}

2. Prepare a list of subscribers

List<MessageSubscriber> subscribers = new ArrayList<>();
subscribers.add(new TextMessageSubscriber());
subscribers.add(new BinaryMessageSubscriber());
subscribers.add(new ConnectionStatusSubscriber());

3. Connect to the web socket

WebSocketService service = new WebSocketService(accessToken, apiCredentials);
WrappedWebSocket socket = service.connect(subscribers);