Skip to content

Commit

Permalink
Measure the time to write the destination file
Browse files Browse the repository at this point in the history
  • Loading branch information
cquiroz committed Nov 21, 2023
1 parent 80b1652 commit cbf8e36
Show file tree
Hide file tree
Showing 8 changed files with 829 additions and 401 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package edu.gemini.aspen.gds.fits

import cats.effect.Async
import cats.effect.Clock
import cats.syntax.all._
import com.google.common.io.{ Files => GFiles }
import edu.gemini.aspen.gds.configuration.{
Expand Down Expand Up @@ -30,7 +31,7 @@ object FitsFileProcessor {
def apply[F[_]](
fitsConfig: FitsConfig,
keywordConfigs: List[KeywordConfigurationItem]
)(implicit F: Async[F]): FitsFileProcessor[F] =
)(implicit F: Async[F]): FitsFileProcessor[F] =
new FitsFileProcessor[F] {
val requiredKeywords: Map[Int, List[String]] =
keywordConfigs
Expand All @@ -49,8 +50,10 @@ object FitsFileProcessor {
source <- F.delay(fitsConfig.sourceDir.resolve(inFileName))
dest <- safeDestinationFile(fitsConfig.destDir, outFileName)
cards <- processKeywordsToCards(keywords)
_ <- FitsFileTransferrer.transfer(source, dest, requiredKeywords, cards)
_ <- logger.infoF(s"FITS file $dest transfer completed.")
t1 <- Clock[F].realTime
c <- FitsFileTransferrer.transfer(source, dest, requiredKeywords, cards)
t2 <- Clock[F].realTime
_ <- logger.infoF(s"FITS file $dest transfer of $c bytes completed in ${t2-t1}.")
_ <- setPermissions(dest, fitsConfig.setPermissions)
_ <- setOwner(dest, fitsConfig.setOwner)
_ <- deleteOriginal(source, fitsConfig.deleteOriginal)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ object FitsFileTransferrer {
output: Path,
requiredHeaders: Map[Int, List[String]],
additionalHeaders: Map[Int, List[FitsHeaderCard]]
): F[Unit] =
): F[Long] =
deleteIfExists(output) >>
stream(
Files[F]
Expand All @@ -208,5 +208,5 @@ object FitsFileTransferrer {
)
.through(Files[F].writeAll(output))
.compile
.drain
.count
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ object ObservationManager {
logger.infoF(s"Got an event $stateEvent")

new ObservationManager[F] {
def process(stateEvent: ObservationStateEvent): F[Unit] = {
println(stateEvent)
def process(stateEvent: ObservationStateEvent): F[Unit] =
stateEvent match {
case Start(dataLabel) =>
for {
Expand Down Expand Up @@ -104,7 +103,6 @@ object ObservationManager {
_ <- keys.traverse(purgeIfNeeded(_, now))
} yield ()
}
}

def withObsItem(dataLabel: DataLabel, event: ObservationStateEvent)(
action: ObservationItem[F] => F[Unit]
Expand Down
16 changes: 10 additions & 6 deletions gds/src/main/scala/edu/gemini/aspen/gds/osgi/Activator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import cats.effect.std.Queue
import cats.effect.unsafe.implicits.global
import edu.gemini.aspen.gds.Main
import edu.gemini.aspen.gds.configuration.{ GDSConfigurationServiceFactory, GdsConfiguration }
import edu.gemini.aspen.gds.observations.{ ObservationEventReceiver, ObservationStateEvent }
import edu.gemini.aspen.gds.observations.ObservationStateEvent
import edu.gemini.aspen.giapi.data.{ DataLabel, ObservationEvent, ObservationEventHandler }
import edu.gemini.aspen.giapi.status.StatusDatabaseService
import edu.gemini.aspen.gmp.services.PropertyHolder
Expand Down Expand Up @@ -95,23 +95,27 @@ class Activator extends BundleActivator {
propTracker.foreach(_.open(true))

obsEventSvc = context
.registerService(classOf[ObservationEventHandler].getName,
new ObservationEventReceiver(handleObsEvent),
new util.Hashtable[String, String]()
.registerService(
classOf[ObservationEventHandler].getName,
new ObservationEventHandler {
def onObservationEvent(event: ObservationEvent, dataLabel: DataLabel): Unit =
handleObsEvent(dataLabel, event)
},
new util.Hashtable[String, String]()
)
.some

// wait for config and start running if we receive it. Otherwise timeout and stop GDS
val run = IO.sleep(5.seconds) *> configDeferred.get.attempt.flatMap {
case Left(_) | Right(None) =>
case Left(_) | Right(None) =>
IO {
logger.severe("GDS timed out waiting for configuration. Shutting down.")
// Trying to cancel the fiber in stop() will result in stop() never completing.
// The fiber is stopping on it's own, anyway.
fiber = none
context.getBundle().stop()
}.void
case Right(Some(config)) =>
case Right(Some(config)) =>
Main.run(config, epicsReaderRef, statusDbRef, observationStateEventQ)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ private[seqexec] final case class DataLabelRequest(
object Decoders {
implicit val dataLabelDecoder: Decoder[DataLabel] =
Decoder.decodeString.emapTry(s => Try(new DataLabel(s)))
implicit val fitsKeywordDecoder: Decoder[FitsKeyword] = Decoder.decodeString.emap(FitsKeyword(_))
implicit val fitsKeywordDecoder: Decoder[FitsKeyword] =
Decoder.decodeString.emap(FitsKeyword(_))
implicit val fitsTypeDecoder: Decoder[FitsType] = Decoder.decodeString.emap(FitsType.fromString)

// The incoming json has 3 fields
Expand Down
Loading

0 comments on commit cbf8e36

Please sign in to comment.