Skip to content

Commit

Permalink
#981 Added first cut of the OmsApi into example code.
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisjstevo committed Nov 20, 2023
1 parent 9e30b2c commit dcea3f7
Show file tree
Hide file tree
Showing 12 changed files with 379 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import org.finos.vuu.core.module.basket.service.{BasketService, BasketTradingCon
import org.finos.vuu.core.module.price.PriceModule
import org.finos.vuu.core.module.{DefaultModule, ModuleFactory, TableDefContainer, ViewServerModule}
import org.finos.vuu.core.table.Columns
import org.finos.vuu.order.oms.OmsApi

object BasketModule extends DefaultModule {

Expand All @@ -23,6 +24,8 @@ object BasketModule extends DefaultModule {

import org.finos.vuu.core.module.basket.BasketModule.{BasketColumnNames => B, BasketConstituentColumnNames => BC, BasketTradingColumnNames => BT, BasketTradingConstituentColumnNames => BTC, PriceStrategy => PS}

val omsApi = OmsApi()

ModuleFactory.withNamespace(NAME)
.addTable(
//this table should contain one row for each of .FTSE, .DJI, .HSI, .etc...
Expand Down Expand Up @@ -77,14 +80,16 @@ object BasketModule extends DefaultModule {
BTC.Algo.string(), BTC.AlgoParams.string(),
BTC.PctFilled.double(), BTC.Weighting.double(),
BTC.PriceSpread.int(),
BTC.LimitPrice.double()
BTC.LimitPrice.double(),
BTC.FilledQty.long(),
BTC.OrderStatus.string()
),// we can join to instruments and other tables to get the rest of the data.....
VisualLinks(
Link(BTC.InstanceId, BasketTradingTable, BT.InstanceId),
),
joinFields = BTC.InstanceIdRic, BTC.Ric
),
(table, vs) => new NullProvider(table),
(table, vs) => new BasketTradingConstituentProvider(table, omsApi),
(table, _, _, tableContainer) => ViewPortDef(
columns = table.getTableDef.columns,
service = new BasketTradingConstituentService(table, tableContainer)
Expand Down Expand Up @@ -192,6 +197,8 @@ object BasketModule extends DefaultModule {
final val PctFilled = "pctFilled"
final val Weighting = "weighting"
final val PriceSpread = "priceSpread"
final val OrderStatus = "orderStatus"
final val FilledQty = "filledQty"
}

object Sides{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package org.finos.vuu.core.module.basket.provider

import org.finos.toolbox.lifecycle.LifecycleContainer
import org.finos.toolbox.thread.LifeCycleRunner
import org.finos.toolbox.time.Clock
import org.finos.vuu.core.module.basket.service.OrderStates
import org.finos.vuu.core.table.{DataTable, RowWithData}
import org.finos.vuu.order.oms._
import org.finos.vuu.provider.DefaultProvider

class BasketTradingConstituentProvider(val table: DataTable, val omsApi: OmsApi)(implicit lifecycle: LifecycleContainer, clock: Clock) extends DefaultProvider {

import org.finos.vuu.core.module.basket.BasketModule.{BasketTradingConstituentColumnNames => BTC}

val runner = new LifeCycleRunner("TradingConsProviderRunner", runOnce, 50L)

omsApi.addListener(new OmsListener {
override def onAck(ack: Ack): Unit = {
table.processUpdate(ack.clientOrderId, RowWithData(ack.clientOrderId, Map[String, Any](BTC.OrderStatus -> OrderStates.ACKED)),clock.now())
}
override def onCancelAck(ack: CancelAck): Unit = ???
override def onReplaceAck(ack: ReplaceAck): Unit = ???
override def onFill(fill: Fill): Unit = {
val state = if(fill.fillQty == fill.totalFilledQty) OrderStates.FILLED else OrderStates.ACKED
table.processUpdate(fill.clientOrderId,
RowWithData(fill.clientOrderId, Map[String, Any](BTC.InstanceIdRic -> fill.clientOrderId,
BTC.FilledQty -> fill.fillQty, BTC.OrderStatus -> state))
,clock.now())
}
})

def runOnce(): Unit = {
omsApi.runOnce()
}


override val lifecycleId: String = "org.finos.vuu.core.module.basket.provider.BasketTradingConstituentProvider#" + hashCode()
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ trait BasketServiceIF{

class BasketService(val table: DataTable, val tableContainer: TableContainer)(implicit clock: Clock) extends RpcHandler with BasketServiceIF with StrictLogging {

//private val counter = new AtomicInteger(0)

import org.finos.vuu.core.module.basket.BasketModule.{BasketConstituentColumnNames => BC, BasketTradingColumnNames => BT, BasketTradingConstituentColumnNames => BTC}

private def getAndPadCounter(session: ClientSessionId): String = {
Expand All @@ -49,6 +47,8 @@ class BasketService(val table: DataTable, val tableContainer: TableContainer)(im
BTC.Weighting -> weighting,
BTC.PriceStrategyId -> 2,
BTC.Algo -> -1,
BTC.OrderStatus -> OrderStates.PENDING,
BTC.FilledQty -> 0
))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import org.finos.vuu.core.module.basket.BasketModule.BasketTradingConstituentCol
import org.finos.vuu.core.table.{DataTable, RowWithData, TableContainer}
import org.finos.vuu.net.ClientSessionId
import org.finos.vuu.net.rpc.{EditRpcHandler, RpcHandler}
import org.finos.vuu.order.oms.OmsApi
import org.finos.vuu.viewport._

class BasketTradingConstituentService(val table: DataTable, val tableContainer: TableContainer)(implicit clock: Clock) extends RpcHandler with EditRpcHandler with StrictLogging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,23 @@ import com.typesafe.scalalogging.StrictLogging
import org.finos.toolbox.time.Clock
import org.finos.vuu.core.module.basket.BasketModule.{BasketTradingConstituentTable, Sides}
import org.finos.vuu.core.table.{DataTable, RowWithData, TableContainer}
import org.finos.vuu.net.ClientSessionId
import org.finos.vuu.net.{ClientSessionId, RequestContext}
import org.finos.vuu.net.rpc.{EditRpcHandler, RpcHandler}
import org.finos.vuu.viewport.{ViewPort, ViewPortAddRowAction, ViewPortDeleteCellAction, ViewPortDeleteRowAction, ViewPortEditAction, ViewPortEditCellAction, ViewPortEditRowAction, ViewPortEditSuccess, ViewPortFormCloseAction, ViewPortFormSubmitAction}

trait BasketTradingServiceIF extends EditRpcHandler{
import org.finos.vuu.viewport.{NoAction, ViewPort, ViewPortAction, ViewPortAddRowAction, ViewPortDeleteCellAction, ViewPortDeleteRowAction, ViewPortEditAction, ViewPortEditCellAction, ViewPortEditRowAction, ViewPortEditSuccess, ViewPortFormCloseAction, ViewPortFormSubmitAction}

trait BasketTradingServiceIF extends EditRpcHandler {
def sendToMarket(basketInstanceId: String)(ctx: RequestContext): ViewPortAction
}

class BasketTradingService(val table: DataTable, val tableContainer: TableContainer)(implicit clock: Clock) extends RpcHandler with BasketTradingServiceIF with StrictLogging {

import org.finos.vuu.core.module.basket.BasketModule.{BasketConstituentColumnNames => BC, BasketTradingColumnNames => BT, BasketTradingConstituentColumnNames => BTC}

override def sendToMarket(basketInstanceId: String)(ctx: RequestContext): ViewPortAction = {
//table.
NoAction()
}

private def onEditCell(key: String, columnName: String, data: Any, vp: ViewPort, session: ClientSessionId): ViewPortEditAction = {
logger.info("Changing cell value for key:" + key + "(" + columnName + ":" + data + ")")
table.processUpdate(key, RowWithData(key, Map(BT.InstanceId -> key, columnName -> data)), clock.now())
Expand All @@ -24,11 +29,11 @@ class BasketTradingService(val table: DataTable, val tableContainer: TableContai
case BT.Units =>
val constituentTable = tableContainer.getTable(BasketTradingConstituentTable)
val constituents = constituentTable.primaryKeys.map(key => constituentTable.pullRow(key)).filter(_.get(BTC.InstanceId) == key)
constituents.foreach( row => {
val unitsAsInt = data.asInstanceOf[Int]
val weighting = row.get(BTC.Weighting)
val quantity = (weighting.asInstanceOf[Double] * unitsAsInt).toLong
constituentTable.processUpdate(row.key(), RowWithData(row.key(), Map(BTC.InstanceIdRic -> row.key(), BTC.Quantity -> quantity)), clock.now())
constituents.foreach(row => {
val unitsAsInt = data.asInstanceOf[Int]
val weighting = row.get(BTC.Weighting)
val quantity = (weighting.asInstanceOf[Double] * unitsAsInt).toLong
constituentTable.processUpdate(row.key(), RowWithData(row.key(), Map(BTC.InstanceIdRic -> row.key(), BTC.Quantity -> quantity)), clock.now())
})
case BT.Side =>
val constituentTable = tableContainer.getTable(BasketTradingConstituentTable)
Expand All @@ -46,18 +51,21 @@ class BasketTradingService(val table: DataTable, val tableContainer: TableContai
}

ViewPortEditSuccess()
}
}

override def deleteRowAction(): ViewPortDeleteRowAction = ???

override def deleteCellAction(): ViewPortDeleteCellAction = ???

override def addRowAction(): ViewPortAddRowAction = ???

override def editCellAction(): ViewPortEditCellAction = ViewPortEditCellAction("", onEditCell)

override def editRowAction(): ViewPortEditRowAction = ???

override def onFormSubmit(): ViewPortFormSubmitAction = ???

override def onFormClose(): ViewPortFormCloseAction = ???


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.finos.vuu.core.module.basket.service

object OrderStates {
final val PENDING = "PENDING"
final val ACKED = "ACKED"
final val CANCELLED = "CANCELLED"
final val FILLED = "FILLED"
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ class BasketMutateOffMarketTest extends VuuServerTestCase {

assertVpEq(combineQsForVp(vpBasketTradingCons)) {
Table(
("quantity", "side", "instanceId", "instanceIdRic", "basketId", "ric", "description", "notionalUsd", "notionalLocal", "venue", "algo", "algoParams", "pctFilled", "weighting", "priceSpread", "limitPrice", "priceStrategyId"),
(10L, "Buy", "chris-001", "chris-001.BP.L", ".FTSE", "BP.L", "Beyond Petroleum", null, null, null, -1, null, null, 0.1, null, null, 2),
(10L, "Sell", "chris-001", "chris-001.BT.L", ".FTSE", "BT.L", "British Telecom", null, null, null, -1, null, null, 0.1, null, null, 2),
(10L, "Buy", "chris-001", "chris-001.VOD.L", ".FTSE", "VOD.L", "Vodafone", null, null, null, -1, null, null, 0.1, null, null, 2)
("quantity", "side", "instanceId", "instanceIdRic", "basketId", "ric", "description", "notionalUsd", "notionalLocal", "venue", "algo", "algoParams", "pctFilled", "weighting", "priceSpread", "limitPrice", "priceStrategyId", "filledQty", "orderStatus"),
(10L, "Buy", "chris-001", "chris-001.BP.L", ".FTSE", "BP.L", "Beyond Petroleum", null, null, null, -1, null, null, 0.1, null, null, 2, 0, "PENDING"),
(10L, "Sell", "chris-001", "chris-001.BT.L", ".FTSE", "BT.L", "British Telecom", null, null, null, -1, null, null, 0.1, null, null, 2, 0, "PENDING"),
(10L, "Buy", "chris-001", "chris-001.VOD.L", ".FTSE", "VOD.L", "Vodafone", null, null, null, -1, null, null, 0.1, null, null, 2, 0, "PENDING")
)
}

Expand All @@ -118,10 +118,10 @@ class BasketMutateOffMarketTest extends VuuServerTestCase {
And("assert the basket trading constituent table has flipped sides also")
assertVpEq(filterByVp(vpBasketTradingCons,updates)) {
Table(
("quantity", "side", "instanceId", "instanceIdRic", "basketId", "ric", "description", "notionalUsd", "notionalLocal", "venue", "algo", "algoParams", "pctFilled", "weighting", "priceSpread", "limitPrice", "priceStrategyId"),
(10L, "Sell", "chris-001", "chris-001.BP.L", ".FTSE", "BP.L", "Beyond Petroleum", null, null, null, -1, null, null, 0.1, null, null, 2),
(10L, "Buy", "chris-001", "chris-001.BT.L", ".FTSE", "BT.L", "British Telecom", null, null, null, -1, null, null, 0.1, null, null, 2),
(10L, "Sell", "chris-001", "chris-001.VOD.L", ".FTSE", "VOD.L", "Vodafone", null, null, null, -1, null, null, 0.1, null, null, 2)
("quantity", "side", "instanceId", "instanceIdRic", "basketId", "ric", "description", "notionalUsd", "notionalLocal", "venue", "algo", "algoParams", "pctFilled", "weighting", "priceSpread", "limitPrice", "priceStrategyId", "filledQty", "orderStatus"),
(10L, "Sell", "chris-001", "chris-001.BP.L", ".FTSE", "BP.L", "Beyond Petroleum", null, null, null, -1, null, null, 0.1, null, null, 2, 0, "PENDING"),
(10L, "Buy", "chris-001", "chris-001.BT.L", ".FTSE", "BT.L", "British Telecom", null, null, null, -1, null, null, 0.1, null, null, 2, 0, "PENDING"),
(10L, "Sell", "chris-001", "chris-001.VOD.L", ".FTSE", "VOD.L", "Vodafone", null, null, null, -1, null, null, 0.1, null, null, 2, 0, "PENDING")
)
}

Expand All @@ -131,10 +131,10 @@ class BasketMutateOffMarketTest extends VuuServerTestCase {
And("assert the basket trading constituent table has increased the units")
assertVpEq(filterByVp(vpBasketTradingCons, combineQs(vpBasketTrading))) {
Table(
("quantity", "side", "instanceId", "instanceIdRic", "basketId", "ric", "description", "notionalUsd", "notionalLocal", "venue", "algo", "algoParams", "pctFilled", "weighting", "priceSpread", "limitPrice", "priceStrategyId"),
(100L, "Sell", "chris-001", "chris-001.BP.L", ".FTSE", "BP.L", "Beyond Petroleum", null, null, null, -1, null, null, 0.1, null, null, 2),
(100L, "Buy", "chris-001", "chris-001.BT.L", ".FTSE", "BT.L", "British Telecom", null, null, null, -1, null, null, 0.1, null, null, 2),
(100L, "Sell", "chris-001", "chris-001.VOD.L", ".FTSE", "VOD.L", "Vodafone", null, null, null, -1, null, null, 0.1, null, null, 2)
("quantity", "side", "instanceId", "instanceIdRic", "basketId", "ric", "description", "notionalUsd", "notionalLocal", "venue", "algo", "algoParams", "pctFilled", "weighting", "priceSpread", "limitPrice", "priceStrategyId", "filledQty", "orderStatus"),
(100L, "Sell", "chris-001", "chris-001.BP.L", ".FTSE", "BP.L", "Beyond Petroleum", null, null, null, -1, null, null, 0.1, null, null, 2, 0, "PENDING"),
(100L, "Buy", "chris-001", "chris-001.BT.L", ".FTSE", "BT.L", "British Telecom", null, null, null, -1, null, null, 0.1, null, null, 2, 0, "PENDING"),
(100L, "Sell", "chris-001", "chris-001.VOD.L", ".FTSE", "VOD.L", "Vodafone", null, null, null, -1, null, null, 0.1, null, null, 2, 0, "PENDING")
)
}
}
Expand Down
9 changes: 9 additions & 0 deletions example/order/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@
<version>0.9.36-SNAPSHOT</version>
</dependency>

<dependency>
<groupId>org.finos.vuu</groupId>
<artifactId>vuu</artifactId>
<version>0.9.36-SNAPSHOT</version>
<classifier>tests</classifier>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.finos.vuu</groupId>
<artifactId>permission</artifactId>
Expand Down
39 changes: 39 additions & 0 deletions example/order/src/main/scala/org/finos/vuu/order/oms/OmsApi.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package org.finos.vuu.order.oms

import org.finos.toolbox.time.Clock
import org.finos.vuu.order.oms.impl.InMemOmsApi

object MaxTimes{
final val MAX_ACK_TIME_MS = 5_000
final val MAX_FILL_TIME_MS = 8_000
}

case class NewOrder(symbol: String, qty: Long, price: Double, clientOrderId: String)
case class ReplaceOrder(orderId: Int, newPrice: Double, newQty: Long)
case class CancelOrder(orderId: Int)
case class Ack(orderId: Int, clientOrderId: String, symbol: String, qty: Long, price: Double)
case class CancelAck(orderId: Int, clientOrderId: String)
case class ReplaceAck(orderId: Int, clientOrderId: String)
case class Fill(orderid: Int, fillQty: Long, fillPrice: Double, clientOrderId: String, totalFilledQty: Long)

trait OmsApi {
def createOrder(newOrder: NewOrder): Unit
def replaceOrder(replaceOrder: ReplaceOrder): Unit
def cancelOrder(cancelOrder: CancelOrder): Unit
def addListener(omsListener: OmsListener): Unit
def runOnce(): Unit
def containsOrder(clientOrderId: String): Boolean
}

trait OmsListener{
def onAck(ack: Ack): Unit
def onCancelAck(ack: CancelAck): Unit
def onReplaceAck(ack: ReplaceAck): Unit
def onFill(fill: Fill): Unit
}

object OmsApi{
def apply()(implicit clock: Clock): OmsApi = {
new InMemOmsApi()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package org.finos.vuu.order.oms.impl

import org.finos.toolbox.time.Clock
import org.finos.vuu.order.oms.impl.States.PENDING_ACK
import org.finos.vuu.order.oms.{Ack, CancelOrder, Fill, NewOrder, OmsApi, OmsListener, ReplaceOrder}

import java.util.concurrent.atomic.AtomicInteger
import scala.util.Random

object States {
def ACKED = 'A'

def PENDING_ACK = '~'

def CANCELLED = 'X'

def PENDING_REPLACE = 'R'

def FILLED = 'F'
}

case class InMemOrderState(symbol: String, qty: Long, price: Double, clientOrderId: String, state: Char, nextEventTime: Long, orderId: Int, filledQty: Long)

object OrderId {
private val orderId = new AtomicInteger(0)
def nextOrderId(): Int = orderId.incrementAndGet()
}

class InMemOmsApi(implicit val clock: Clock) extends OmsApi {

private final val random = new Random(clock.now())

@volatile private var orders = List[InMemOrderState]()
@volatile private var listeners = List[OmsListener]()


override def containsOrder(clientOrderId: String): Boolean = orders.exists(_.clientOrderId == clientOrderId)

override def createOrder(newOrder: NewOrder): Unit = {
orders = orders ++ List(InMemOrderState(newOrder.symbol, newOrder.qty, newOrder.price,
newOrder.clientOrderId, States.PENDING_ACK, clock.now() + random.between(1, 1000), OrderId.nextOrderId(), 0L))
}

override def replaceOrder(replaceOrder: ReplaceOrder): Unit = ???

override def cancelOrder(cancelOrder: CancelOrder): Unit = ???

override def addListener(omsListener: OmsListener): Unit = listeners = listeners ++ List(omsListener)

override def runOnce(): Unit = {
orders = orders.map(orderstate => {

if (orderstate.nextEventTime <= clock.now()) {

orderstate.state match {
case '~' =>
val orderId = OrderId.nextOrderId()
listeners.foreach(_.onAck(Ack(orderId, orderstate.clientOrderId, orderstate.symbol, orderstate.qty, orderstate.price)))
orderstate.copy(state = States.ACKED, nextEventTime = clock.now() + random.between(1000, 5000), orderId = orderId)
case 'A' =>
val remainingQty = orderstate.qty - orderstate.filledQty
val fillQty = if(remainingQty > 1) random.between(1, orderstate.qty - orderstate.filledQty) else 1
listeners.foreach(_.onFill(Fill(orderstate.orderId, fillQty, orderstate.price, orderstate.clientOrderId, orderstate.filledQty + fillQty)))
orderstate.copy(filledQty = orderstate.filledQty + fillQty, nextEventTime = clock.now() + random.between(1000, 5000))
case _ =>
orderstate
}

} else {
orderstate
}
})

orders = orders.filter(os => os.filledQty != os.qty)

}

}
Loading

0 comments on commit dcea3f7

Please sign in to comment.