diff --git a/webapi/src/main/resources/application.conf b/webapi/src/main/resources/application.conf index 269462a2310..b27a4a42c96 100644 --- a/webapi/src/main/resources/application.conf +++ b/webapi/src/main/resources/application.conf @@ -431,6 +431,8 @@ app { password = ${?KNORA_WEBAPI_TRIPLESTORE_FUSEKI_PASSWORD} query-logging-threshold = 1s query-logging-threshold = ${?KNORA_WEBAPI_TRIPLESTORE_FUSEKI_QUERY_LOGGING_THRESHOLD} + allow-compaction = false + allow-compaction = ${?KNORA_WEBAPI_TRIPLESTORE_FUSEKI_ALLOW_COMPACTION} } // If true, the time taken by each SPARQL query is logged at DEBUG level. To see these messages, diff --git a/webapi/src/main/scala/org/knora/webapi/config/AppConfig.scala b/webapi/src/main/scala/org/knora/webapi/config/AppConfig.scala index 53b785a2c74..5348cf7f070 100644 --- a/webapi/src/main/scala/org/knora/webapi/config/AppConfig.scala +++ b/webapi/src/main/scala/org/knora/webapi/config/AppConfig.scala @@ -165,6 +165,7 @@ final case class Fuseki( username: String, password: String, queryLoggingThreshold: Duration = Duration.ofMillis(1000), + allowCompaction: Boolean = false, ) final case class InstrumentationServerConfig( diff --git a/webapi/src/main/scala/org/knora/webapi/slice/infrastructure/api/ManagementEndpoints.scala b/webapi/src/main/scala/org/knora/webapi/slice/infrastructure/api/ManagementEndpoints.scala index 578e7f19edf..ab5c93ef48e 100644 --- a/webapi/src/main/scala/org/knora/webapi/slice/infrastructure/api/ManagementEndpoints.scala +++ b/webapi/src/main/scala/org/knora/webapi/slice/infrastructure/api/ManagementEndpoints.scala @@ -23,6 +23,7 @@ import org.knora.webapi.slice.common.api.BaseEndpoints import org.knora.webapi.slice.common.api.HandlerMapper import org.knora.webapi.slice.common.api.PublicEndpointHandler import org.knora.webapi.slice.common.api.TapirToPekkoInterpreter +import org.knora.webapi.store.triplestore.api.TriplestoreService final case class VersionResponse( webapi: String, @@ -85,6 +86,11 @@ final case class ManagementEndpoints(baseEndpoints: BaseEndpoints) { .out(jsonBody[HealthResponse]) .out(statusCode) + private[infrastructure] val postStartCompaction = baseEndpoints.publicEndpoint.post + .in("start-compaction") + .out(jsonBody[String]) + .out(statusCode) + val endpoints: Seq[AnyEndpoint] = List(getVersion, getHealth).map(_.tag("Management")) } @@ -97,6 +103,7 @@ final case class ManagementRoutes( state: State, mapper: HandlerMapper, tapirToPekko: TapirToPekkoInterpreter, + triplestore: TriplestoreService, ) { private val versionEndpointHandler = @@ -111,7 +118,17 @@ final case class ManagementRoutes( (response, if (response.status) StatusCode.Ok else StatusCode.ServiceUnavailable) } - val routes = List(versionEndpointHandler, healthEndpointHandler) + private val startCompactionHandler = + PublicEndpointHandler[Unit, (String, StatusCode)]( + endpoint.postStartCompaction, + _ => { + triplestore.compact().map { allowed => + ("", if (allowed) StatusCode.Ok else StatusCode.Forbidden) + } + }, + ) + + val routes = List(versionEndpointHandler, healthEndpointHandler, startCompactionHandler) .map(mapper.mapPublicEndpointHandler(_)) .map(tapirToPekko.toRoute) } diff --git a/webapi/src/main/scala/org/knora/webapi/store/triplestore/api/TriplestoreService.scala b/webapi/src/main/scala/org/knora/webapi/store/triplestore/api/TriplestoreService.scala index 93be19ed58c..b89a0dc0657 100644 --- a/webapi/src/main/scala/org/knora/webapi/store/triplestore/api/TriplestoreService.scala +++ b/webapi/src/main/scala/org/knora/webapi/store/triplestore/api/TriplestoreService.scala @@ -142,6 +142,8 @@ trait TriplestoreService { def uploadRepository(inputFile: Path): Task[Unit] def dropGraph(graphName: String): Task[Unit] + + def compact(): Task[Boolean] } object TriplestoreService { diff --git a/webapi/src/main/scala/org/knora/webapi/store/triplestore/impl/FusekiPaths.scala b/webapi/src/main/scala/org/knora/webapi/store/triplestore/impl/FusekiPaths.scala index 98722595eb7..2796abcfa03 100644 --- a/webapi/src/main/scala/org/knora/webapi/store/triplestore/impl/FusekiPaths.scala +++ b/webapi/src/main/scala/org/knora/webapi/store/triplestore/impl/FusekiPaths.scala @@ -22,10 +22,13 @@ trait FusekiTriplestore { case class FusekiPaths(config: Fuseki) { val checkServer: List[String] = List("$", "server") - val repository: List[String] = List(config.repositoryName) - val data: List[String] = repository :+ "data" - val get: List[String] = repository :+ "get" - val query: List[String] = repository :+ "query" - val update: List[String] = repository :+ "update" val datasets: List[String] = List("$", "datasets") + val tasks: List[String] = List("$", "tasks") + val compact: List[String] = List("$", "compact", config.repositoryName) + + val repository: List[String] = List(config.repositoryName) + val data: List[String] = repository :+ "data" + val get: List[String] = repository :+ "get" + val query: List[String] = repository :+ "query" + val update: List[String] = repository :+ "update" } diff --git a/webapi/src/main/scala/org/knora/webapi/store/triplestore/impl/TriplestoreServiceLive.scala b/webapi/src/main/scala/org/knora/webapi/store/triplestore/impl/TriplestoreServiceLive.scala index 6cd4030c564..31357d03f7f 100644 --- a/webapi/src/main/scala/org/knora/webapi/store/triplestore/impl/TriplestoreServiceLive.scala +++ b/webapi/src/main/scala/org/knora/webapi/store/triplestore/impl/TriplestoreServiceLive.scala @@ -20,6 +20,7 @@ import sttp.client3.httpclient.zio.HttpClientZioBackend import zio.* import zio.json.* import zio.json.ast.Json +import zio.json.ast.JsonCursor import zio.metrics.Metric import zio.nio.file.Path as NioPath @@ -297,6 +298,36 @@ case class TriplestoreServiceLive( fusekiServer <- ZIO.fromEither(response.body.toOption.getOrElse("").fromJson[FusekiServer]).mapError(Throwable(_)) } yield fusekiServer.datasets.find(ds => ds.dsName == s"/${fusekiConfig.repositoryName}" && ds.dsState).nonEmpty + override def compact(): Task[Boolean] = + ZIO + .when(fusekiConfig.allowCompaction) { + for { + _ <- ZIO.logInfo("Starting compaction") + request = authenticatedRequest.post(targetHostUri.addPath(paths.compact).addParam("deleteOld", "true")) + body <- doHttpRequest(request) + json <- ZIO.fromEither(body.body.merge.fromJson[Json]).mapError(new RuntimeException(_)) + cursor = JsonCursor.field("taskId").isString + taskId <- ZIO.fromEither(json.get(cursor)).mapBoth(new RuntimeException(_), _.value) + _ <- ZIO.logInfo(s"Awaiting compaction task: $taskId") + _ <- awaitOnce(isFinished(taskId)).repeatWhileEquals(false) + _ <- ZIO.logInfo("Compaction finished.") + } yield () + } + .map(_.isDefined) + + private def awaitOnce[A](z: UIO[A]): UIO[Boolean] = + z.flatMap { + case true => ZIO.succeed(true) + case false => ZIO.sleep(20.seconds).as(false) + } + + private def isFinished(taskId: String): UIO[Boolean] = + (for { + _ <- ZIO.logDebug(s"Checking compaction task: $taskId") + body <- doHttpRequest(authenticatedRequest.get(targetHostUri.addPath(paths.tasks).addPath(taskId))) + json <- ZIO.fromEither(body.body.merge.fromJson[Json]).mapError(new RuntimeException(_)) + } yield json.get(JsonCursor.field("finished").isString).isRight).orDie + /** * Initialize the Jena Fuseki triplestore. Currently only works for * 'knora-test' and 'knora-test-unit' repository names. To be used, the diff --git a/webapi/src/test/scala/org/knora/webapi/store/triplestore/api/TriplestoreServiceInMemory.scala b/webapi/src/test/scala/org/knora/webapi/store/triplestore/api/TriplestoreServiceInMemory.scala index a4404426c28..a8908bd3c00 100644 --- a/webapi/src/test/scala/org/knora/webapi/store/triplestore/api/TriplestoreServiceInMemory.scala +++ b/webapi/src/test/scala/org/knora/webapi/store/triplestore/api/TriplestoreServiceInMemory.scala @@ -301,6 +301,7 @@ final case class TriplestoreServiceInMemory(datasetRef: Ref[Dataset])(implicit v override def dropGraph(graphName: IRI): Task[Unit] = notImplemented + override def compact(): Task[Boolean] = ZIO.succeed(false) } object TriplestoreServiceInMemory {