Skip to content

Commit

Permalink
Merge pull request #49 from NordicSemiconductor/feature/pause-resume
Browse files Browse the repository at this point in the history
Pause and Resume
  • Loading branch information
philips77 authored Apr 5, 2022
2 parents 49d0fbf + 102783e commit 8ef9278
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,9 @@ import io.runtime.mcumgr.exception.InsufficientMtuException
import io.runtime.mcumgr.exception.McuMgrException
import io.runtime.mcumgr.managers.ImageManager
import io.runtime.mcumgr.response.UploadResponse
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import org.slf4j.LoggerFactory
import java.security.MessageDigest
import java.security.NoSuchAlgorithmException
Expand Down Expand Up @@ -60,9 +57,17 @@ fun ImageManager.windowUpload(
}

return object : TransferController {
override fun pause() = throw IllegalStateException("cannot pause window upload")
override fun resume() = throw IllegalStateException("cannot resume window upload")
var paused: Job? = null

override fun pause() {
paused = GlobalScope.launch { uploader.pause() }
}
override fun resume() {
uploader.resume()
paused = null
}
override fun cancel() {
paused?.cancel()
job.cancel()
}
}
Expand All @@ -79,7 +84,7 @@ private suspend fun Uploader.uploadCatchMtu() {
}

internal class ImageUploader(
private val imageData: ByteArray,
imageData: ByteArray,
private val image: Int,
private val imageManager: ImageManager,
windowCapacity: Int = 1,
Expand All @@ -101,25 +106,22 @@ internal class ImageUploader(
map: MutableMap<String, Any>
) {
map.takeIf { offset == 0 }?.apply {
put("len", imageData.size)
if (image > 0) {
put("image", image)
}
takeIf { image > 0 }?.let { put("image", image) }
sha(data)?.let { put("sha", it) }
}
}

override fun getAdditionalSize(offset: Int): Int = when {
offset > 0 -> 0
else -> {
val lenSize = cborStringLength("len") + cborUIntLength(imageData.size)
override fun getAdditionalSize(offset: Int): Int = when (offset) {
// "sha" and "image" params are only sent in the first packet.
0 -> {
val shaSize =
cborStringLength("sha") + cborUIntLength(TRUNCATED_HASH_LEN) + TRUNCATED_HASH_LEN
val imageSize = if (image > 0) {
cborStringLength("image") + cborUIntLength(image)
} else 0
lenSize + shaSize + imageSize
shaSize + imageSize
}
else -> 0
}

/**
Expand Down
46 changes: 39 additions & 7 deletions mcumgr-core/src/main/java/io/runtime/mcumgr/transfer/Uploader.kt
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ abstract class Uploader(
MutableStateFlow(UploadProgress(0, data.size))

val progress: Flow<UploadProgress> = _progress
private val resumed = Semaphore(1)

/**
* This method should send the request with given parameters.
Expand All @@ -49,6 +50,9 @@ abstract class Uploader(
callback: (UploadResult) -> Unit
)

/**
* Uploads the data.
*/
@Throws
suspend fun upload() = coroutineScope {
// Tracks the number of failures experienced for any given chunk,
Expand All @@ -68,6 +72,10 @@ abstract class Uploader(
while (true) {
window.acquire()

// Try acquiring resumed lock. If worked, release it immediately.
resumed.acquire()
resumed.release()

// Select the next chunk to send, prioritizing failed chunks.
val (chunk, resend) = select<Pair<Chunk, Boolean>?> {
failures.onReceive { it to true }
Expand Down Expand Up @@ -124,23 +132,34 @@ abstract class Uploader(
}
}

/**
* Pauses upload.
*/
suspend fun pause() {
resumed.acquire()
}

/**
* Resumes upload.
*/
fun resume() {
resumed.release()
}

private suspend fun writeInternal(
chunk: Chunk,
resend: Boolean,
scope: CoroutineScope,
callback: suspend (UploadResult) -> Unit
): Chunk {
val resultChannel: Channel<UploadResult> = Channel(1)
val map = prepareWrite(chunk.data, chunk.offset)
log.debug(map.toString())
write(map) {
resultChannel.trySend(it)
write(prepareWrite(chunk.data, chunk.offset)) { result ->
resultChannel.trySend(result)
}

return if (chunk.offset == 0) {
// Send the first chunk synchronously, to get the last offset.
val result = resultChannel.receive()
log.debug("Result received: $result")
callback(result)

result.onSuccess {
Expand Down Expand Up @@ -195,6 +214,9 @@ abstract class Uploader(
"data" to data,
"off" to offset
).also {
if (offset == 0) {
it["len"] = this.data.size // NOT data.size, as data is just a chunk of this.data
}
getAdditionalData(data, offset, it)
}

Expand Down Expand Up @@ -224,10 +246,18 @@ abstract class Uploader(
// Size of the string "off" plus the length of the offset integer
val offsetSize = cborStringLength("off") + cborUIntLength(offset)

// Size of the string "len" plus the length of the data size integer
// "len" is sent only in the initial packet.
val lengthSize = if (offset == 0) {
cborStringLength("len") + cborUIntLength(data.size)
} else {
0
}

// Implementation specific size
val implSpecificSize = getAdditionalSize(offset)

val combinedSize = headerSize + mapSize + offsetSize + implSpecificSize + dataStringSize
val combinedSize = headerSize + mapSize + offsetSize + lengthSize + implSpecificSize + dataStringSize

// Now we calculate the max amount of data that we can fit given the MTU.
val maxDataLength = mtu - combinedSize
Expand All @@ -243,6 +273,7 @@ abstract class Uploader(

/**
* This method should add additional parameters to the map.
* The "data", "len" and "off" parameters are already added.
*/
internal open fun getAdditionalData(
data: ByteArray,
Expand All @@ -254,7 +285,8 @@ abstract class Uploader(

/**
* This method should return an additional size of the CBOR payload, which will be placed to the
* packet with the given offset. By default only "data" and "off" are added.
* packet with the given offset.
* The "data", "len" and "off" parameters are already calculated.
*/
internal open fun getAdditionalSize(offset: Int) = 0
}
Expand Down

0 comments on commit 8ef9278

Please sign in to comment.