Skip to content

Commit

Permalink
#944 Added fix for threading related issue in ViewPortContainer.scala…
Browse files Browse the repository at this point in the history
… when change was called.
  • Loading branch information
chrisjstevo committed Nov 1, 2023
1 parent 12a5ff9 commit 985c035
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 111 deletions.
28 changes: 14 additions & 14 deletions vuu/src/main/scala/org/finos/vuu/net/ClientConnectionCreator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,17 @@ class DefaultMessageHandler(val channel: Channel,
sessionContainer: ClientSessionContainer,
moduleContainer: ModuleContainer)(implicit timeProvider: Clock) extends MessageHandler with StrictLogging {

val closeFuture = channel.closeFuture()
val closeFuture: ChannelFuture = channel.closeFuture()

closeFuture.addListener(new ChannelFutureListener {
override def operationComplete(f: ChannelFuture): Unit = {
logger.info("Calling disconnect() from future callback")
disconnect()
}
closeFuture.addListener((f: ChannelFuture) => {
logger.info("Calling disconnect() from future callback")
disconnect()
})

private def sendUpdatesInternal(updates: Seq[ViewPortUpdate], highPriority: Boolean = false) = {
if (!updates.isEmpty) {
if (updates.nonEmpty) {

logger.info(s"ASYNC-SVR-OUT: Sending ${updates.size} updates")

val formatted = formatDataOutbound(updates)

Expand Down Expand Up @@ -81,7 +81,7 @@ class DefaultMessageHandler(val channel: Channel,
}
}

def disconnect() = {
def disconnect(): ChannelFuture = {
serverApi.disconnect(session)
sessionContainer.remove(session)
channel.disconnect()
Expand All @@ -91,19 +91,19 @@ class DefaultMessageHandler(val channel: Channel,
protected def formatDataOutbound(outbound: Seq[ViewPortUpdate]): TableRowUpdates = {

val updates = outbound.filter(vpu => vpu.vpRequestId == vpu.vp.getRequestId).flatMap(vp => formatOneRowUpdate(vp)).toArray
//val updates = outbound.flatMap(vp => formatOneRowUpdate(vp)).toArray

val updateId = RequestId.oneNew()

TableRowUpdates(updateId, true, timeProvider.now, updates)
TableRowUpdates(updateId, isLast = true, timeProvider.now(), updates)
}

protected def formatOneRowUpdate(update: ViewPortUpdate): Option[RowUpdate] = {

update.vpUpdate match {
case SizeUpdateType => {
case SizeUpdateType =>
//logger.debug(s"SVR[VP] Size: vpid=${update.vp.id} size=${update.vp.size}")
Some(RowUpdate(update.vpRequestId, update.vp.id, update.size, update.index, update.key.key, UpdateType.SizeOnly, timeProvider.now(), 0, Array.empty))
}

case RowUpdateType =>

Expand Down Expand Up @@ -168,12 +168,12 @@ class DefaultMessageHandler(val channel: Channel,
case None =>
logger.error(s"Could not find impl for service ${rpc.service}")
Some(VsMsg(msg.requestId, msg.sessionId, msg.token, msg.user,
RpcResponse(rpc.method, null, Error(s"Handler not found for rpc call ${rpc} for service ${rpc.service} in module ${msg.module}", -1)))
RpcResponse(rpc.method, null, Error(s"Handler not found for rpc call $rpc for service ${rpc.service} in module ${msg.module}", -1)))
)
}
case None =>
Some(VsMsg(msg.requestId, msg.sessionId, msg.token, msg.user,
RpcResponse(rpc.method, null, Error(s"Handler not found for rpc call ${rpc} in module ${msg.module}", -1))))
RpcResponse(rpc.method, null, Error(s"Handler not found for rpc call $rpc in module ${msg.module}", -1))))
}
}

Expand All @@ -196,7 +196,7 @@ case class ClientSessionId(sessionId: String, user: String) extends Ordered[Clie

trait ClientSessionContainer {

def register(sessionId: ClientSessionId, messageHandler: MessageHandler)
def register(sessionId: ClientSessionId, messageHandler: MessageHandler): Unit

//def addConnection(session: ClientSessionId, channel: Channel, handler: InboundMessageHandler): Unit
def getHandler(sessionId: ClientSessionId): Option[MessageHandler]
Expand Down
6 changes: 5 additions & 1 deletion vuu/src/main/scala/org/finos/vuu/viewport/ViewPort.scala
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ class ViewPortImpl(val id: String,

val onlySortOrFilterChange = onlyFilterOrSortChanged(newStructuralFields, structuralFields.get())

logger.info(s"changeStructure(..) onlySortOrFilterChange=$onlySortOrFilterChange")

structuralFields.set(newStructuralFields)

if (!onlySortOrFilterChange)
Expand Down Expand Up @@ -277,6 +279,8 @@ class ViewPortImpl(val id: String,

val inrangeKeys = currentKeys.slice(from, to)

logger.info(s"Sending updates on ${inrangeKeys.length} inrangeKeys")

inrangeKeys.zip(from to to).foreach({ case (key, index) => publishHighPriorityUpdate(key, index) })
}

Expand Down Expand Up @@ -474,8 +478,8 @@ class ViewPortImpl(val id: String,
}

private def publishHighPriorityUpdate(key: String, index: Int): Unit = {
logger.debug(s"publishing update @[$index] = $key ")
if (this.enabled) {
logger.debug(s"publishing update @[$index] = $key ")
outboundQ.pushHighPriority(ViewPortUpdate(this.requestId, this, table, RowKeyUpdate(key, table), index, RowUpdateType, this.keys.length, timeProvider.now()))
}
}
Expand Down
13 changes: 10 additions & 3 deletions vuu/src/main/scala/org/finos/vuu/viewport/ViewPortContainer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,9 @@ class ViewPortContainer(val tableContainer: TableContainer, val providerContaine
UserDefinedFilterAndSort(aFilter, aSort)
}

//update the viewport request id, to prevent any unwanted updates going out while we're changing the viewport
viewPort.setRequestId(requestId)

//we are not grouped by, but we want to change to a group by
if (viewPort.getGroupBy == NoGroupBy && groupBy != NoGroupBy) {

Expand Down Expand Up @@ -512,12 +515,16 @@ class ViewPortContainer(val tableContainer: TableContainer, val providerContaine

} else {
logger.info("[VP] default else condition in change() call")
val structure = viewport.ViewPortStructuralFields(table = viewPort.table, columns = columns, viewPortDef = viewPort.getStructure.viewPortDef, filtAndSort = filtAndSort, filterSpec = filterSpec, groupBy = groupBy, viewPort.getTreeNodeStateStore, permissionChecker)
val structure = viewport.ViewPortStructuralFields(table = viewPort.table,
columns = columns, viewPortDef = viewPort.getStructure.viewPortDef,
filtAndSort = filtAndSort, filterSpec = filterSpec,
groupBy = groupBy, viewPort.getTreeNodeStateStore, permissionChecker
)
//viewPort.setRequestId(requestId)
viewPort.changeStructure(structure)
//viewPort.setKeys(viewPort.getKeys)
}

viewPort.setRequestId(requestId)

viewPort
}

Expand Down
Loading

0 comments on commit 985c035

Please sign in to comment.