Skip to content

Commit

Permalink
feat: Compaction endpoint, guarded (DEV-3703)
Browse files Browse the repository at this point in the history
  • Loading branch information
siers committed Jun 4, 2024
1 parent d7fe791 commit 6531413
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 6 deletions.
2 changes: 2 additions & 0 deletions webapi/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,8 @@ app {
password = ${?KNORA_WEBAPI_TRIPLESTORE_FUSEKI_PASSWORD}
query-logging-threshold = 1s
query-logging-threshold = ${?KNORA_WEBAPI_TRIPLESTORE_FUSEKI_QUERY_LOGGING_THRESHOLD}
allow-compaction = false
allow-compaction = ${?KNORA_WEBAPI_TRIPLESTORE_FUSEKI_ALLOW_COMPACTION}
}

// If true, the time taken by each SPARQL query is logged at DEBUG level. To see these messages,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ final case class Fuseki(
username: String,
password: String,
queryLoggingThreshold: Duration = Duration.ofMillis(1000),
allowCompaction: Boolean = false,
)

final case class InstrumentationServerConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.knora.webapi.slice.common.api.BaseEndpoints
import org.knora.webapi.slice.common.api.HandlerMapper
import org.knora.webapi.slice.common.api.PublicEndpointHandler
import org.knora.webapi.slice.common.api.TapirToPekkoInterpreter
import org.knora.webapi.store.triplestore.api.TriplestoreService

final case class VersionResponse(
webapi: String,
Expand Down Expand Up @@ -85,6 +86,11 @@ final case class ManagementEndpoints(baseEndpoints: BaseEndpoints) {
.out(jsonBody[HealthResponse])
.out(statusCode)

private[infrastructure] val postStartCompaction = baseEndpoints.publicEndpoint.post
.in("start-compaction")
.out(jsonBody[String])
.out(statusCode)

val endpoints: Seq[AnyEndpoint] = List(getVersion, getHealth).map(_.tag("Management"))
}

Expand All @@ -97,6 +103,7 @@ final case class ManagementRoutes(
state: State,
mapper: HandlerMapper,
tapirToPekko: TapirToPekkoInterpreter,
triplestore: TriplestoreService,
) {

private val versionEndpointHandler =
Expand All @@ -111,7 +118,17 @@ final case class ManagementRoutes(
(response, if (response.status) StatusCode.Ok else StatusCode.ServiceUnavailable)
}

val routes = List(versionEndpointHandler, healthEndpointHandler)
private val startCompactionHandler =
PublicEndpointHandler[Unit, (String, StatusCode)](
endpoint.postStartCompaction,
_ => {
triplestore.compact().map { allowed =>
("", if (allowed) StatusCode.Ok else StatusCode.Forbidden)
}
},
)

val routes = List(versionEndpointHandler, healthEndpointHandler, startCompactionHandler)
.map(mapper.mapPublicEndpointHandler(_))
.map(tapirToPekko.toRoute)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ trait TriplestoreService {
def uploadRepository(inputFile: Path): Task[Unit]

def dropGraph(graphName: String): Task[Unit]

def compact(): Task[Boolean]
}

object TriplestoreService {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@ trait FusekiTriplestore {

case class FusekiPaths(config: Fuseki) {
val checkServer: List[String] = List("$", "server")
val repository: List[String] = List(config.repositoryName)
val data: List[String] = repository :+ "data"
val get: List[String] = repository :+ "get"
val query: List[String] = repository :+ "query"
val update: List[String] = repository :+ "update"
val datasets: List[String] = List("$", "datasets")
val tasks: List[String] = List("$", "tasks")
val compact: List[String] = List("$", "compact", config.repositoryName)

val repository: List[String] = List(config.repositoryName)
val data: List[String] = repository :+ "data"
val get: List[String] = repository :+ "get"
val query: List[String] = repository :+ "query"
val update: List[String] = repository :+ "update"
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import sttp.client3.httpclient.zio.HttpClientZioBackend
import zio.*
import zio.json.*
import zio.json.ast.Json
import zio.json.ast.JsonCursor
import zio.metrics.Metric
import zio.nio.file.Path as NioPath

Expand Down Expand Up @@ -297,6 +298,36 @@ case class TriplestoreServiceLive(
fusekiServer <- ZIO.fromEither(response.body.toOption.getOrElse("").fromJson[FusekiServer]).mapError(Throwable(_))
} yield fusekiServer.datasets.find(ds => ds.dsName == s"/${fusekiConfig.repositoryName}" && ds.dsState).nonEmpty

override def compact(): Task[Boolean] =
ZIO
.when(fusekiConfig.allowCompaction) {
for {
_ <- ZIO.logInfo("Starting compaction")
request = authenticatedRequest.post(targetHostUri.addPath(paths.compact).addParam("deleteOld", "true"))
body <- doHttpRequest(request)
json <- ZIO.fromEither(body.body.merge.fromJson[Json]).mapError(new RuntimeException(_))
cursor = JsonCursor.field("taskId").isString
taskId <- ZIO.fromEither(json.get(cursor)).mapBoth(new RuntimeException(_), _.value)
_ <- ZIO.logInfo(s"Awaiting compaction task: $taskId")
_ <- awaitOnce(isFinished(taskId)).repeatWhileEquals(false)
_ <- ZIO.logInfo("Compaction finished.")
} yield ()
}
.map(_.isDefined)

private def awaitOnce[A](z: UIO[A]): UIO[Boolean] =
z.flatMap {
case true => ZIO.succeed(true)
case false => ZIO.sleep(20.seconds).as(false)
}

private def isFinished(taskId: String): UIO[Boolean] =
(for {
_ <- ZIO.logDebug(s"Checking compaction task: $taskId")
body <- doHttpRequest(authenticatedRequest.get(targetHostUri.addPath(paths.tasks).addPath(taskId)))
json <- ZIO.fromEither(body.body.merge.fromJson[Json]).mapError(new RuntimeException(_))
} yield json.get(JsonCursor.field("finished").isString).isRight).orDie

/**
* Initialize the Jena Fuseki triplestore. Currently only works for
* 'knora-test' and 'knora-test-unit' repository names. To be used, the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ final case class TriplestoreServiceInMemory(datasetRef: Ref[Dataset])(implicit v
override def dropGraph(graphName: IRI): Task[Unit] =
notImplemented

override def compact(): Task[Boolean] = ZIO.succeed(false)
}

object TriplestoreServiceInMemory {
Expand Down

0 comments on commit 6531413

Please sign in to comment.