-
Notifications
You must be signed in to change notification settings - Fork 3
Connecting to WebSocket
Upstox's WebSocket channel provides various types of push updates in near real time. Riko provides you with an API to connect to the WebSocket to receive those push updates.
When you connect to the WebSocket you would receive messages that would be one of the following type:
-
TextMessage
- A standard text message.Order & Trade Updates from Upstox
are of this type. -
BinaryMessage
- Message in binary form. Needs to be converted toString
for further processing.Quote Updates from Upstox
are of this type. -
ConnectedMessage
- Indicates the application is now connected to Upstox WebSocket. -
DisconnectedMessage
- Indicates the application is now disconnected from Upstox WebSocket.Disconnect Events from Upstox
are of this type. -
ClosingMessage
- Indicates the application is now disconnecting from Upstox WebSocket. -
ErrorMessage
- Indicates that an error has occurred with the WebSocket connection.Error events from Upstox
are of this type.
To receive the above messages, you need to subscribe to them, by providing the subscriber during connection.
Based on your preference, you can have a single subscriber that processes all the types of messages, or may have individual subscribers for each type.
You need to implement the MessageSubscriber
interface and provide its implementation to the WebSocketService::connect()
method. The internal web socket implementation of Riko (MessageListener
) here uses the Java 9 Flow API to publish the incoming messages to the interested subscribers (MessageSubscriber
).
An example of a single MessageSubscriber
, processing all types of messages:
public class UpstoxWebSocketSubscriber implements MessageSubscriber {
private static final Logger log = LogManager.getLogger(UpstoxWebSocketSubscriber.class);
private Flow.Subscription subscription;
@Override
public void onSubscribe(final Flow.Subscription subscription) {
log.info("Subscribed! Ready to receive messages!");
this.subscription = subscription;
this.subscription.request(1);
}
@Override
public void onNext(final WebSocketMessage item) {
if (item instanceof BinaryMessage) {
log.info("Binary Message: {}", ((BinaryMessage) item).getMessageAsString());
} else if (item instanceof ConnectedMessage) {
final ConnectedMessage message = (ConnectedMessage) item;
log.info("Connected to Upstox: {}", message.getMessage());
} else if (item instanceof DisconnectedMessage) {
final DisconnectedMessage message = (DisconnectedMessage) item;
log.info("Disconnected from Upstox: Code: {}, Reason: {}", message.getCode(), message.getReason());
} else if (item instanceof ClosingMessage) {
final ClosingMessage message = (ClosingMessage) item;
log.warn("Closing the web-socket connection: Code: {}, Reason: {}", message.getCode(), message.getReason());
} else if (item instanceof ErrorMessage) {
final ErrorMessage message = (ErrorMessage) item;
// Reusing the 'onError()'
onError(new MyCustomException("Error from Upstox: " + message.getReason(), message.getThrowable()));
} else {
// if (item instanceof TextMessage) {
final TextMessage message = (TextMessage) item;
log.info("Text message received: {}", message);
}
// Ask for the next message (do not miss this line)
this.subscription.request(1);
}
@Override
public void onError(final Throwable throwable) {
log.fatal("Error occurred: {}", throwable);
}
@Override
public void onComplete() {
log.info("Subscription is now complete - no more messages from Upstox.");
}
@Override
public String getName() {
return "MySubscriber"; // Provide a unique name
}
}
Now that we have the subscriber(s) ready, we can easily connect to the WebSocket as below:
WebSocketService service = new WebSocketService(accessToken, apiCredentials);
WrappedWebSocket socket = service.connect(subscribers);
Note: The WebSocketService::connect()
takes a List<MessageSubscriber>
.
Note: Before connecting to Upstox's WebSocket, Riko automatically retrieves the best connection parameters required to provide a stable WebSocket connection with Upstox and uses them while to make the connection.
public class TextMessageSubscriber implements MessageSubscriber {...}
public class BinaryMessageSubscriber implements MessageSubscriber {...}
public class ConnectionStatusSubscriber implements MessageSubscriber {...}
List<MessageSubscriber> subscribers = new ArrayList<>();
subscribers.add(new TextMessageSubscriber());
subscribers.add(new BinaryMessageSubscriber());
subscribers.add(new ConnectionStatusSubscriber());
WebSocketService service = new WebSocketService(accessToken, apiCredentials);
WrappedWebSocket socket = service.connect(subscribers);