Skip to content

Commit

Permalink
Upgrade gds-fp v0.0.9 - the typelevel dependencies
Browse files Browse the repository at this point in the history
  • Loading branch information
toddburnside committed Nov 23, 2023
1 parent 77ea193 commit 7df6cef
Show file tree
Hide file tree
Showing 9 changed files with 80 additions and 65 deletions.
2 changes: 1 addition & 1 deletion gds/src/main/scala/edu/gemini/aspen/gds/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ object Main {
)
obsMgr <- ObservationManager(config.observation, kwMgr, obsStateQ, fitsQ)
fitsProcessor = FitsFileProcessor[IO](config.fitsConfig, keywordConfig)
seqexecServer = SeqexecServer(obsStateQ, config.seqexecHost, config.seqexecPort)
seqexecServer = Stream.eval(SeqexecServer(obsStateQ, config.seqexecHost, config.seqexecPort))
obsPurge = Stream
.fixedDelay[IO](config.observation.cleanupRate)
.foreach(_ => obsStateQ.offer(ObservationStateEvent.PurgeStale))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ package edu.gemini.aspen.gds.configuration
import cats.data.{ NonEmptyChain, ValidatedNec }
import cats.data.Validated.{ Invalid, Valid }
import cats.syntax.all._
import com.comcast.ip4s.Host
import com.comcast.ip4s.Port
import edu.gemini.aspen.gmp.services.PropertyHolder
import fs2.io.file.{ Path => Fs2Path }
import java.nio.file._
import java.util.{ Dictionary }
import java.util.logging.Logger
Expand Down Expand Up @@ -61,8 +64,8 @@ class GDSConfigurationServiceFactory(
val eventSleep = asDuration(props, "observation.event.sleep")
val keywordRetries = asPosInt(props, "keyword.collection.retries")
val keywordSleep = asDuration(props, "keyword.collection.sleep")
val seqexecPort = asPosInt(props, "seqexec.server.port")
val seqexecHost = asString(props, "seqexec.server.host")
val seqexecPort = asPort(props, "seqexec.server.port")
val seqexecHost = asHost(props, "seqexec.server.host")
val fitsDelOrig = asBool(props, "fits.deleteOriginal")

val configValidated = (keywordConfig,
Expand Down Expand Up @@ -103,6 +106,16 @@ class GDSConfigurationServiceFactory(
private def asString(props: Map[String, _], key: String): ValidatedNec[String, String] =
props.get(key).map(_.toString).toValidNec(s"Config value missing for `$key`")

private def asHost(props: Map[String, _], key: String): ValidatedNec[String, Host] =
asString(props, key).andThen(s =>
Host.fromString(s).toValidNec(s"Invalid host name `$s` for `$key`")
)

private def asPort(props: Map[String, _], key: String): ValidatedNec[String, Port] =
asString(props, key).andThen(s =>
Port.fromString(s).toValidNec(s"Invalid port value`$s` for `$key`. Must be a positive integer.")
)

private def asPosInt(props: Map[String, _], key: String): ValidatedNec[String, Int] =
asString(props, key).andThen(s =>
s.toIntOption
Expand All @@ -127,25 +140,27 @@ class GDSConfigurationServiceFactory(
Option.when(duration.isFinite)(duration.asInstanceOf[FiniteDuration])
}

private def fitsSrcDir: ValidatedNec[String, Path] = dirFromPropHolder(fitsSrcPathKey)
private def fitsDestDir: ValidatedNec[String, Path] = dirFromPropHolder(fitsDestPathKey)
private def fitsSrcDir: ValidatedNec[String, Fs2Path] = dirFromPropHolder(fitsSrcPathKey)
private def fitsDestDir: ValidatedNec[String, Fs2Path] = dirFromPropHolder(fitsDestPathKey)

// PropertyHolder shouldn't return null, but...
private def propHolderValue(key: String): ValidatedNec[String, String] =
Option(propertyHolder.getProperty(key))
.toValidNec(s"Value for PropertyHolder sevice key `$key` cannot be null.")

private def dirFromPropHolder(key: String): ValidatedNec[String, Path] =
private def dirFromPropHolder(key: String): ValidatedNec[String, Fs2Path] =
propHolderValue(key).andThen { dir =>
Try {
// Not using FS2 io here, because it would introduce an `F[_]', which would not work well at
// this point in osgi
val path = Paths.get(dir)
if (path.toFile().isDirectory()) path.validNec
else
s"Value `$dir` for PropertyHolder service key `$key` is not a valid directory".invalidNec
} match {
case Failure(e) =>
s"Error creating Path for PropertyHolder service key `$key`, value `$dir`: ${e.getMessage}".invalidNec
case Success(value) => value
case Success(value) => value.map(p => Fs2Path.fromNioPath(p))
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package edu.gemini.aspen.gds.configuration

import java.nio.file.Path
import fs2.io.file.Path
import com.comcast.ip4s.Host
import com.comcast.ip4s.Port
import scala.concurrent.duration._

final case class ObservationConfig(
Expand All @@ -25,7 +27,7 @@ final case class GdsConfiguration(
keywords: List[KeywordConfigurationItem],
observation: ObservationConfig,
keywordRetries: RetryConfig,
seqexecPort: Int,
seqexecHost: String,
seqexecPort: Port,
seqexecHost: Host,
fitsConfig: FitsConfig
)
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import edu.gemini.aspen.gds.syntax.all._
import edu.gemini.aspen.gds.transfer.FitsFileTransferrer
import edu.gemini.aspen.giapi.data.DataLabel
import fs2.io.file.{ Files => Fs2Files }
import java.nio.file.Path
import fs2.io.file.Path
import java.util.logging.Logger
import scala.concurrent.duration._
import scala.sys.process._
Expand All @@ -28,10 +28,10 @@ sealed trait FitsFileProcessor[F[_]] {
object FitsFileProcessor {
private val logger = Logger.getLogger(this.getClass.getName)

def apply[F[_]](
def apply[F[_]: Async: Fs2Files](
fitsConfig: FitsConfig,
keywordConfigs: List[KeywordConfigurationItem]
)(implicit F: Async[F]): FitsFileProcessor[F] =
): FitsFileProcessor[F] =
new FitsFileProcessor[F] {
val requiredKeywords: Map[Int, List[String]] =
keywordConfigs
Expand All @@ -47,7 +47,7 @@ object FitsFileProcessor {

val result = for {
_ <- logger.infoF(s"Preparing to transfer FITS file $outFileName")
source <- F.delay(fitsConfig.sourceDir.resolve(inFileName))
source = fitsConfig.sourceDir.resolve(inFileName)
dest <- safeDestinationFile(fitsConfig.destDir, outFileName)
cards <- processKeywordsToCards(keywords)
t1 <- Clock[F].realTime
Expand All @@ -66,16 +66,18 @@ object FitsFileProcessor {
)
}

def safeDestinationFile(dir: Path, name: String): F[Path] = for {
fullPath <- F.delay(dir.resolve(name))
exists <- Fs2Files[F].exists(fullPath)
safePath <-
if (exists)
logger.warningF(
s"Output file $fullPath already exists - generating new name."
) >> safeDestinationFile(dir, newDestinationFileName(name))
else fullPath.pure[F]
} yield safePath
def safeDestinationFile(dir: Path, name: String): F[Path] = {
val fullPath = dir.resolve(name)
for {
exists <- Fs2Files[F].exists(fullPath)
safePath <-
if (exists)
logger.warningF(
s"Output file $fullPath already exists - generating new name."
) >> safeDestinationFile(dir, newDestinationFileName(name))
else fullPath.pure[F]
} yield safePath
}

def newDestinationFileName(fullName: String): String = {
val nameRegex = """(\w*)-(\d+)""".r
Expand Down Expand Up @@ -177,9 +179,9 @@ object FitsFileProcessor {
for {
_ <- logger.infoF(s"Changing ownership of `$dest` to `${c.owner}` with command: $cmd")
b <- runCommand(cmd)
_ <- if (b) F.unit else logger.severeF(s"Failed to change ownership of `$dest`")
_ <- if (b) Async[F].unit else logger.severeF(s"Failed to change ownership of `$dest`")
} yield ()
case None => F.unit
case None => Async[F].unit
}

def setPermissions(dest: Path, config: Option[SetPermissionsConfig]): F[Unit] = config match {
Expand All @@ -190,25 +192,25 @@ object FitsFileProcessor {
s"Changing permissions of `$dest` to `${c.permissions}` with command: $cmd"
)
b <- runCommand(cmd)
_ <- if (b) F.unit else logger.severeF(s"Failed to change permissions of `$dest`")
_ <- if (b) Async[F].unit else logger.severeF(s"Failed to change permissions of `$dest`")
} yield ()
case None => F.unit
case None => Async[F].unit
}

def deleteOriginal(file: Path, deleteIt: Boolean): F[Unit] =
if (deleteIt)
logger.infoF(s"Deleting original FITS file: $file") >> Fs2Files[F]
.delete(file)
.handleErrorWith(e => logger.severeF(s"Failed to delete original file: $file", e))
else F.unit
else Async[F].unit

def runCommand(command: String): F[Boolean] = {
val plogger =
ProcessLogger(normalLine => logger.info(s"Command output: $normalLine"),
errorLine => logger.severe(s"Command error: $errorLine")
)
for {
process <- F.delay(command.run(plogger))
process <- Async[F].delay(command.run(plogger))
// we need to timeout in case the command hangs. For example, if sudo is used, and a password is required.
// Neither of the commands should take long to completed.
result <- waitForResult(process, 10)
Expand All @@ -224,8 +226,8 @@ object FitsFileProcessor {

def waitForResult(process: Process, remaining: Int): F[Option[Int]] =
if (process.isAlive())
if (remaining > 0) F.sleep(250.milliseconds) >> waitForResult(process, remaining - 1)
else F.blocking(process.destroy()).as(none)
else F.blocking(process.exitValue().some)
if (remaining > 0) Async[F].sleep(250.milliseconds) >> waitForResult(process, remaining - 1)
else Async[F].blocking(process.destroy()).as(none)
else Async[F].blocking(process.exitValue().some)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ import cats.syntax.all._
import edu.gemini.aspen.gds.fits._
import edu.gemini.aspen.gds.syntax.all._
import fs2._
import fs2.io.file.Files
import java.nio.file.Path
import fs2.io.file.{ Files, Flags, Path }
import java.util.logging.Logger

object FitsFileTransferrer {
Expand Down Expand Up @@ -177,13 +176,12 @@ object FitsFileTransferrer {
in => go(in, ParserState.empty).stream
}

private def deleteIfExists[F[_]: Async](output: Path): F[Unit] =
private def deleteIfExists[F[_]: Async: Files](output: Path): F[Unit] =
Files[F].exists(output).flatMap {
case true =>
logger.warningF(s"Output file $output already exists. It will be deleted.") >> Files[F]
.deleteIfExists(output)
.void
case false => Applicative[F].unit
logger.warningF(s"Output file $output already exists. It will be deleted.") >>
Files[F].deleteIfExists(output).void
case false => Async[F].unit
}

def stream[F[_]: Async](
Expand All @@ -193,7 +191,7 @@ object FitsFileTransferrer {
): Stream[F, Byte] =
input.through(fitsPipe(requiredHeaders, additionalHeaders))

def transfer[F[_]: Async](
def transfer[F[_]: Async: Files](
input: Path,
output: Path,
requiredHeaders: Map[Int, List[String]],
Expand All @@ -203,15 +201,13 @@ object FitsFileTransferrer {
_ <- deleteIfExists(output)
ref <- Ref.of(0L)
_ <- stream(
Files[F]
.readAll(input, chunkSize = RecordLength),
Files[F].readAll(input, chunkSize = RecordLength, flags = Flags.Read),
requiredHeaders,
additionalHeaders
)
.chunks
.evalTap(c => ref.update(_ + c.size))
// if/when fs2 is upgraded, can switch to `unchunks` instead of flatMap
.flatMap(Stream.chunk(_))
.unchunks
.through(Files[F].writeAll(output))
.compile
.drain
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package edu.gemini.aspen.gds.keywords

import cats.effect.kernel._
import cats.effect.std.MapRef
import cats.effect.syntax.all._
import cats.syntax.all._
import edu.gemini.aspen.gds.configuration.RetryConfig
import edu.gemini.aspen.gds.syntax.all._
import edu.gemini.aspen.giapi.data.{ DataLabel, ObservationEvent }
import io.chrisdavenport.mapref.MapRef
import io.chrisdavenport.mapref.implicits._
import java.util.logging.Logger

sealed trait KeywordManager[F[_]] {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package edu.gemini.aspen.gds.observations

import cats.effect.{ Async, Clock }
import cats.effect.{ Async, Clock, Ref }
import cats.effect.std.MapRef
import cats.effect.std.QueueSink
import cats.syntax.all._
import edu.gemini.aspen.gds.configuration.ObservationConfig
import edu.gemini.aspen.gds.keywords.{ CollectedKeyword, KeywordManager }
import edu.gemini.aspen.gds.observations.ObservationStateEvent._
import edu.gemini.aspen.gds.syntax.all._
import edu.gemini.aspen.giapi.data.DataLabel
import io.chrisdavenport.mapref.MapRef
import io.chrisdavenport.mapref.implicits._
import java.util.logging.Logger
import scala.concurrent.duration._

Expand All @@ -26,7 +25,9 @@ object ObservationManager {
obsStateQ: QueueSink[F, ObservationStateEvent],
fitsQ: QueueSink[F, (DataLabel, List[CollectedKeyword])]
)(implicit F: Async[F]): F[ObservationManager[F]] =
MapRef.ofConcurrentHashMap[F, DataLabel, ObservationItem[F]]().map { mapref =>
Ref.of(Map[DataLabel, ObservationItem[F]]()).map { refOfMap =>
val mapref = MapRef.fromSingleImmutableMapRef(refOfMap)

def addDataLabel(dataLabel: DataLabel): F[ObservationItem[F]] =
for {
_ <- logIfExists(dataLabel)
Expand Down Expand Up @@ -98,7 +99,7 @@ object ObservationManager {
case PurgeStale =>
for {
_ <- logger.infoF("Checking for expired observations.")
keys <- mapref.keys
keys <- refOfMap.get.map(_.keys.toList)
now <- Clock[F].realTime
_ <- keys.traverse(purgeIfNeeded(_, now))
} yield ()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@ package edu.gemini.aspen.gds.seqexec
import cats.effect._
import cats.effect.std.QueueSink
import cats.syntax.all._
import com.comcast.ip4s.Host
import com.comcast.ip4s.Port
import edu.gemini.aspen.gds.keywords.CollectedKeyword
import edu.gemini.aspen.gds.model.KeywordSource
import edu.gemini.aspen.gds.observations.ObservationStateEvent
import edu.gemini.aspen.gds.observations.ObservationStateEvent._
import fs2.Stream
import org.http4s._
import org.http4s.circe._
import org.http4s.dsl.io._
import org.http4s.implicits._
import org.http4s.blaze.server._
import org.http4s.ember.server._
import org.http4s.server.Router

import Decoders._
Expand All @@ -26,9 +27,9 @@ object SeqexecServer {

def apply(
obsStateQ: QueueSink[IO, ObservationStateEvent],
host: String,
port: Integer
): Stream[IO, Unit] = {
host: Host,
port: Port
): IO[Unit] = {
def kwv2Collected(kwv: KeywordValue) =
CollectedKeyword.Value(kwv.keyword, KeywordSource.SeqExec, none, kwv.value)

Expand Down Expand Up @@ -69,10 +70,12 @@ object SeqexecServer {

val httpApp = Router("gds-seqexec" -> service).orNotFound

BlazeServerBuilder[IO](scala.concurrent.ExecutionContext.Implicits.global)
.bindHttp(port, host)
EmberServerBuilder
.default[IO]
.withHost(host)
.withPort(port)
.withHttpApp(httpApp)
.serve
.as(())
.build
.use(_ => IO.never)
}
}
5 changes: 1 addition & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,6 @@
<slf4j.api.version>1.7.30</slf4j.api.version>
<maven-release-plugin.version>3.0.0-M1</maven-release-plugin.version>

<fs2.version>3.0.4</fs2.version>
<fs2.core>fs2-core_2.13</fs2.core>
<fs2.io>fs2-io_2.13</fs2.io>
<gmp.version>${project.version}</gmp.version>
<!-- Global encoding -->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand Down Expand Up @@ -761,7 +758,7 @@
<dependency>
<groupId>edu.gemini.gds</groupId>
<artifactId>gds-fp</artifactId>
<version>0.0.8</version>
<version>0.0.9</version>
<exclusions>
<exclusion>
<artifactId>co.fs2</artifactId>
Expand Down

0 comments on commit 7df6cef

Please sign in to comment.