diff --git a/gds/src/main/scala/edu/gemini/aspen/gds/Main.scala b/gds/src/main/scala/edu/gemini/aspen/gds/Main.scala index a07a7a082..12fb24531 100644 --- a/gds/src/main/scala/edu/gemini/aspen/gds/Main.scala +++ b/gds/src/main/scala/edu/gemini/aspen/gds/Main.scala @@ -38,7 +38,7 @@ object Main { ) obsMgr <- ObservationManager(config.observation, kwMgr, obsStateQ, fitsQ) fitsProcessor = FitsFileProcessor[IO](config.fitsConfig, keywordConfig) - seqexecServer = SeqexecServer(obsStateQ, config.seqexecHost, config.seqexecPort) + seqexecServer = Stream.eval(SeqexecServer(obsStateQ, config.seqexecHost, config.seqexecPort)) obsPurge = Stream .fixedDelay[IO](config.observation.cleanupRate) .foreach(_ => obsStateQ.offer(ObservationStateEvent.PurgeStale)) diff --git a/gds/src/main/scala/edu/gemini/aspen/gds/configuration/GDSConfigurationServiceFactory.scala b/gds/src/main/scala/edu/gemini/aspen/gds/configuration/GDSConfigurationServiceFactory.scala index 26e770444..a23e88839 100644 --- a/gds/src/main/scala/edu/gemini/aspen/gds/configuration/GDSConfigurationServiceFactory.scala +++ b/gds/src/main/scala/edu/gemini/aspen/gds/configuration/GDSConfigurationServiceFactory.scala @@ -3,7 +3,10 @@ package edu.gemini.aspen.gds.configuration import cats.data.{ NonEmptyChain, ValidatedNec } import cats.data.Validated.{ Invalid, Valid } import cats.syntax.all._ +import com.comcast.ip4s.Host +import com.comcast.ip4s.Port import edu.gemini.aspen.gmp.services.PropertyHolder +import fs2.io.file.{ Path => Fs2Path } import java.nio.file._ import java.util.{ Dictionary } import java.util.logging.Logger @@ -61,8 +64,8 @@ class GDSConfigurationServiceFactory( val eventSleep = asDuration(props, "observation.event.sleep") val keywordRetries = asPosInt(props, "keyword.collection.retries") val keywordSleep = asDuration(props, "keyword.collection.sleep") - val seqexecPort = asPosInt(props, "seqexec.server.port") - val seqexecHost = asString(props, "seqexec.server.host") + val seqexecPort = asPort(props, "seqexec.server.port") + val seqexecHost = asHost(props, "seqexec.server.host") val fitsDelOrig = asBool(props, "fits.deleteOriginal") val configValidated = (keywordConfig, @@ -103,6 +106,16 @@ class GDSConfigurationServiceFactory( private def asString(props: Map[String, _], key: String): ValidatedNec[String, String] = props.get(key).map(_.toString).toValidNec(s"Config value missing for `$key`") + private def asHost(props: Map[String, _], key: String): ValidatedNec[String, Host] = + asString(props, key).andThen(s => + Host.fromString(s).toValidNec(s"Invalid host name `$s` for `$key`") + ) + + private def asPort(props: Map[String, _], key: String): ValidatedNec[String, Port] = + asString(props, key).andThen(s => + Port.fromString(s).toValidNec(s"Invalid port value`$s` for `$key`. Must be a positive integer.") + ) + private def asPosInt(props: Map[String, _], key: String): ValidatedNec[String, Int] = asString(props, key).andThen(s => s.toIntOption @@ -127,17 +140,19 @@ class GDSConfigurationServiceFactory( Option.when(duration.isFinite)(duration.asInstanceOf[FiniteDuration]) } - private def fitsSrcDir: ValidatedNec[String, Path] = dirFromPropHolder(fitsSrcPathKey) - private def fitsDestDir: ValidatedNec[String, Path] = dirFromPropHolder(fitsDestPathKey) + private def fitsSrcDir: ValidatedNec[String, Fs2Path] = dirFromPropHolder(fitsSrcPathKey) + private def fitsDestDir: ValidatedNec[String, Fs2Path] = dirFromPropHolder(fitsDestPathKey) // PropertyHolder shouldn't return null, but... private def propHolderValue(key: String): ValidatedNec[String, String] = Option(propertyHolder.getProperty(key)) .toValidNec(s"Value for PropertyHolder sevice key `$key` cannot be null.") - private def dirFromPropHolder(key: String): ValidatedNec[String, Path] = + private def dirFromPropHolder(key: String): ValidatedNec[String, Fs2Path] = propHolderValue(key).andThen { dir => Try { + // Not using FS2 io here, because it would introduce an `F[_]', which would not work well at + // this point in osgi val path = Paths.get(dir) if (path.toFile().isDirectory()) path.validNec else @@ -145,7 +160,7 @@ class GDSConfigurationServiceFactory( } match { case Failure(e) => s"Error creating Path for PropertyHolder service key `$key`, value `$dir`: ${e.getMessage}".invalidNec - case Success(value) => value + case Success(value) => value.map(p => Fs2Path.fromNioPath(p)) } } diff --git a/gds/src/main/scala/edu/gemini/aspen/gds/configuration/GdsConfiguration.scala b/gds/src/main/scala/edu/gemini/aspen/gds/configuration/GdsConfiguration.scala index e1271c581..5ea9a1260 100644 --- a/gds/src/main/scala/edu/gemini/aspen/gds/configuration/GdsConfiguration.scala +++ b/gds/src/main/scala/edu/gemini/aspen/gds/configuration/GdsConfiguration.scala @@ -1,6 +1,8 @@ package edu.gemini.aspen.gds.configuration -import java.nio.file.Path +import fs2.io.file.Path +import com.comcast.ip4s.Host +import com.comcast.ip4s.Port import scala.concurrent.duration._ final case class ObservationConfig( @@ -25,7 +27,7 @@ final case class GdsConfiguration( keywords: List[KeywordConfigurationItem], observation: ObservationConfig, keywordRetries: RetryConfig, - seqexecPort: Int, - seqexecHost: String, + seqexecPort: Port, + seqexecHost: Host, fitsConfig: FitsConfig ) diff --git a/gds/src/main/scala/edu/gemini/aspen/gds/fits/FitsFileProcessor.scala b/gds/src/main/scala/edu/gemini/aspen/gds/fits/FitsFileProcessor.scala index 1947872cf..14749bf61 100644 --- a/gds/src/main/scala/edu/gemini/aspen/gds/fits/FitsFileProcessor.scala +++ b/gds/src/main/scala/edu/gemini/aspen/gds/fits/FitsFileProcessor.scala @@ -16,7 +16,7 @@ import edu.gemini.aspen.gds.syntax.all._ import edu.gemini.aspen.gds.transfer.FitsFileTransferrer import edu.gemini.aspen.giapi.data.DataLabel import fs2.io.file.{ Files => Fs2Files } -import java.nio.file.Path +import fs2.io.file.Path import java.util.logging.Logger import scala.concurrent.duration._ import scala.sys.process._ @@ -28,10 +28,10 @@ sealed trait FitsFileProcessor[F[_]] { object FitsFileProcessor { private val logger = Logger.getLogger(this.getClass.getName) - def apply[F[_]]( + def apply[F[_]: Async: Fs2Files]( fitsConfig: FitsConfig, keywordConfigs: List[KeywordConfigurationItem] - )(implicit F: Async[F]): FitsFileProcessor[F] = + ): FitsFileProcessor[F] = new FitsFileProcessor[F] { val requiredKeywords: Map[Int, List[String]] = keywordConfigs @@ -47,7 +47,7 @@ object FitsFileProcessor { val result = for { _ <- logger.infoF(s"Preparing to transfer FITS file $outFileName") - source <- F.delay(fitsConfig.sourceDir.resolve(inFileName)) + source = fitsConfig.sourceDir.resolve(inFileName) dest <- safeDestinationFile(fitsConfig.destDir, outFileName) cards <- processKeywordsToCards(keywords) t1 <- Clock[F].realTime @@ -66,16 +66,18 @@ object FitsFileProcessor { ) } - def safeDestinationFile(dir: Path, name: String): F[Path] = for { - fullPath <- F.delay(dir.resolve(name)) - exists <- Fs2Files[F].exists(fullPath) - safePath <- - if (exists) - logger.warningF( - s"Output file $fullPath already exists - generating new name." - ) >> safeDestinationFile(dir, newDestinationFileName(name)) - else fullPath.pure[F] - } yield safePath + def safeDestinationFile(dir: Path, name: String): F[Path] = { + val fullPath = dir.resolve(name) + for { + exists <- Fs2Files[F].exists(fullPath) + safePath <- + if (exists) + logger.warningF( + s"Output file $fullPath already exists - generating new name." + ) >> safeDestinationFile(dir, newDestinationFileName(name)) + else fullPath.pure[F] + } yield safePath + } def newDestinationFileName(fullName: String): String = { val nameRegex = """(\w*)-(\d+)""".r @@ -177,9 +179,9 @@ object FitsFileProcessor { for { _ <- logger.infoF(s"Changing ownership of `$dest` to `${c.owner}` with command: $cmd") b <- runCommand(cmd) - _ <- if (b) F.unit else logger.severeF(s"Failed to change ownership of `$dest`") + _ <- if (b) Async[F].unit else logger.severeF(s"Failed to change ownership of `$dest`") } yield () - case None => F.unit + case None => Async[F].unit } def setPermissions(dest: Path, config: Option[SetPermissionsConfig]): F[Unit] = config match { @@ -190,9 +192,9 @@ object FitsFileProcessor { s"Changing permissions of `$dest` to `${c.permissions}` with command: $cmd" ) b <- runCommand(cmd) - _ <- if (b) F.unit else logger.severeF(s"Failed to change permissions of `$dest`") + _ <- if (b) Async[F].unit else logger.severeF(s"Failed to change permissions of `$dest`") } yield () - case None => F.unit + case None => Async[F].unit } def deleteOriginal(file: Path, deleteIt: Boolean): F[Unit] = @@ -200,7 +202,7 @@ object FitsFileProcessor { logger.infoF(s"Deleting original FITS file: $file") >> Fs2Files[F] .delete(file) .handleErrorWith(e => logger.severeF(s"Failed to delete original file: $file", e)) - else F.unit + else Async[F].unit def runCommand(command: String): F[Boolean] = { val plogger = @@ -208,7 +210,7 @@ object FitsFileProcessor { errorLine => logger.severe(s"Command error: $errorLine") ) for { - process <- F.delay(command.run(plogger)) + process <- Async[F].delay(command.run(plogger)) // we need to timeout in case the command hangs. For example, if sudo is used, and a password is required. // Neither of the commands should take long to completed. result <- waitForResult(process, 10) @@ -224,8 +226,8 @@ object FitsFileProcessor { def waitForResult(process: Process, remaining: Int): F[Option[Int]] = if (process.isAlive()) - if (remaining > 0) F.sleep(250.milliseconds) >> waitForResult(process, remaining - 1) - else F.blocking(process.destroy()).as(none) - else F.blocking(process.exitValue().some) + if (remaining > 0) Async[F].sleep(250.milliseconds) >> waitForResult(process, remaining - 1) + else Async[F].blocking(process.destroy()).as(none) + else Async[F].blocking(process.exitValue().some) } } diff --git a/gds/src/main/scala/edu/gemini/aspen/gds/fits/FitsFileTransferrer.scala b/gds/src/main/scala/edu/gemini/aspen/gds/fits/FitsFileTransferrer.scala index c02742e0f..fc887e1bb 100644 --- a/gds/src/main/scala/edu/gemini/aspen/gds/fits/FitsFileTransferrer.scala +++ b/gds/src/main/scala/edu/gemini/aspen/gds/fits/FitsFileTransferrer.scala @@ -7,8 +7,7 @@ import cats.syntax.all._ import edu.gemini.aspen.gds.fits._ import edu.gemini.aspen.gds.syntax.all._ import fs2._ -import fs2.io.file.Files -import java.nio.file.Path +import fs2.io.file.{ Files, Flags, Path } import java.util.logging.Logger object FitsFileTransferrer { @@ -177,13 +176,12 @@ object FitsFileTransferrer { in => go(in, ParserState.empty).stream } - private def deleteIfExists[F[_]: Async](output: Path): F[Unit] = + private def deleteIfExists[F[_]: Async: Files](output: Path): F[Unit] = Files[F].exists(output).flatMap { case true => - logger.warningF(s"Output file $output already exists. It will be deleted.") >> Files[F] - .deleteIfExists(output) - .void - case false => Applicative[F].unit + logger.warningF(s"Output file $output already exists. It will be deleted.") >> + Files[F].deleteIfExists(output).void + case false => Async[F].unit } def stream[F[_]: Async]( @@ -193,7 +191,7 @@ object FitsFileTransferrer { ): Stream[F, Byte] = input.through(fitsPipe(requiredHeaders, additionalHeaders)) - def transfer[F[_]: Async]( + def transfer[F[_]: Async: Files]( input: Path, output: Path, requiredHeaders: Map[Int, List[String]], @@ -203,15 +201,13 @@ object FitsFileTransferrer { _ <- deleteIfExists(output) ref <- Ref.of(0L) _ <- stream( - Files[F] - .readAll(input, chunkSize = RecordLength), + Files[F].readAll(input, chunkSize = RecordLength, flags = Flags.Read), requiredHeaders, additionalHeaders ) .chunks .evalTap(c => ref.update(_ + c.size)) - // if/when fs2 is upgraded, can switch to `unchunks` instead of flatMap - .flatMap(Stream.chunk(_)) + .unchunks .through(Files[F].writeAll(output)) .compile .drain diff --git a/gds/src/main/scala/edu/gemini/aspen/gds/keywords/KeywordManager.scala b/gds/src/main/scala/edu/gemini/aspen/gds/keywords/KeywordManager.scala index e4fdcc802..e69a8f25a 100644 --- a/gds/src/main/scala/edu/gemini/aspen/gds/keywords/KeywordManager.scala +++ b/gds/src/main/scala/edu/gemini/aspen/gds/keywords/KeywordManager.scala @@ -1,13 +1,12 @@ package edu.gemini.aspen.gds.keywords import cats.effect.kernel._ +import cats.effect.std.MapRef import cats.effect.syntax.all._ import cats.syntax.all._ import edu.gemini.aspen.gds.configuration.RetryConfig import edu.gemini.aspen.gds.syntax.all._ import edu.gemini.aspen.giapi.data.{ DataLabel, ObservationEvent } -import io.chrisdavenport.mapref.MapRef -import io.chrisdavenport.mapref.implicits._ import java.util.logging.Logger sealed trait KeywordManager[F[_]] { diff --git a/gds/src/main/scala/edu/gemini/aspen/gds/observations/ObservationManager.scala b/gds/src/main/scala/edu/gemini/aspen/gds/observations/ObservationManager.scala index 511bb90f4..97ab092b3 100644 --- a/gds/src/main/scala/edu/gemini/aspen/gds/observations/ObservationManager.scala +++ b/gds/src/main/scala/edu/gemini/aspen/gds/observations/ObservationManager.scala @@ -1,6 +1,7 @@ package edu.gemini.aspen.gds.observations -import cats.effect.{ Async, Clock } +import cats.effect.{ Async, Clock, Ref } +import cats.effect.std.MapRef import cats.effect.std.QueueSink import cats.syntax.all._ import edu.gemini.aspen.gds.configuration.ObservationConfig @@ -8,8 +9,6 @@ import edu.gemini.aspen.gds.keywords.{ CollectedKeyword, KeywordManager } import edu.gemini.aspen.gds.observations.ObservationStateEvent._ import edu.gemini.aspen.gds.syntax.all._ import edu.gemini.aspen.giapi.data.DataLabel -import io.chrisdavenport.mapref.MapRef -import io.chrisdavenport.mapref.implicits._ import java.util.logging.Logger import scala.concurrent.duration._ @@ -26,7 +25,9 @@ object ObservationManager { obsStateQ: QueueSink[F, ObservationStateEvent], fitsQ: QueueSink[F, (DataLabel, List[CollectedKeyword])] )(implicit F: Async[F]): F[ObservationManager[F]] = - MapRef.ofConcurrentHashMap[F, DataLabel, ObservationItem[F]]().map { mapref => + Ref.of(Map[DataLabel, ObservationItem[F]]()).map { refOfMap => + val mapref = MapRef.fromSingleImmutableMapRef(refOfMap) + def addDataLabel(dataLabel: DataLabel): F[ObservationItem[F]] = for { _ <- logIfExists(dataLabel) @@ -98,7 +99,7 @@ object ObservationManager { case PurgeStale => for { _ <- logger.infoF("Checking for expired observations.") - keys <- mapref.keys + keys <- refOfMap.get.map(_.keys.toList) now <- Clock[F].realTime _ <- keys.traverse(purgeIfNeeded(_, now)) } yield () diff --git a/gds/src/main/scala/edu/gemini/aspen/gds/seqexec/SeqexecServer.scala b/gds/src/main/scala/edu/gemini/aspen/gds/seqexec/SeqexecServer.scala index 197f322af..7ab6459b0 100644 --- a/gds/src/main/scala/edu/gemini/aspen/gds/seqexec/SeqexecServer.scala +++ b/gds/src/main/scala/edu/gemini/aspen/gds/seqexec/SeqexecServer.scala @@ -3,16 +3,17 @@ package edu.gemini.aspen.gds.seqexec import cats.effect._ import cats.effect.std.QueueSink import cats.syntax.all._ +import com.comcast.ip4s.Host +import com.comcast.ip4s.Port import edu.gemini.aspen.gds.keywords.CollectedKeyword import edu.gemini.aspen.gds.model.KeywordSource import edu.gemini.aspen.gds.observations.ObservationStateEvent import edu.gemini.aspen.gds.observations.ObservationStateEvent._ -import fs2.Stream import org.http4s._ import org.http4s.circe._ import org.http4s.dsl.io._ import org.http4s.implicits._ -import org.http4s.blaze.server._ +import org.http4s.ember.server._ import org.http4s.server.Router import Decoders._ @@ -26,9 +27,9 @@ object SeqexecServer { def apply( obsStateQ: QueueSink[IO, ObservationStateEvent], - host: String, - port: Integer - ): Stream[IO, Unit] = { + host: Host, + port: Port + ): IO[Unit] = { def kwv2Collected(kwv: KeywordValue) = CollectedKeyword.Value(kwv.keyword, KeywordSource.SeqExec, none, kwv.value) @@ -69,10 +70,12 @@ object SeqexecServer { val httpApp = Router("gds-seqexec" -> service).orNotFound - BlazeServerBuilder[IO](scala.concurrent.ExecutionContext.Implicits.global) - .bindHttp(port, host) + EmberServerBuilder + .default[IO] + .withHost(host) + .withPort(port) .withHttpApp(httpApp) - .serve - .as(()) + .build + .use(_ => IO.never) } } diff --git a/pom.xml b/pom.xml index d03e0acb1..99ec2a7ad 100644 --- a/pom.xml +++ b/pom.xml @@ -56,9 +56,6 @@ 1.7.30 3.0.0-M1 - 3.0.4 - fs2-core_2.13 - fs2-io_2.13 ${project.version} UTF-8 @@ -761,7 +758,7 @@ edu.gemini.gds gds-fp - 0.0.8 + 0.0.9 co.fs2 @@ -932,6 +929,10 @@ + + Local + file:///Users/tburnside/code/noirlab/maven-repo/releases + Gemini Development https://github.com/gemini-hlsw/maven-repo/raw/master/releases