Skip to content

Commit

Permalink
feat: Compaction endpoint, guarded (DEV-3703) (#3265)
Browse files Browse the repository at this point in the history
  • Loading branch information
siers authored Jun 6, 2024
1 parent 351b3e0 commit d5576b5
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 24 deletions.
58 changes: 41 additions & 17 deletions integration/src/test/scala/org/knora/webapi/core/LayersTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import zio.*

import org.knora.sipi.SipiServiceTestDelegator
import org.knora.sipi.WhichSipiService
import org.knora.webapi.config.AppConfig
import org.knora.webapi.config.AppConfig.AppConfigurations
import org.knora.webapi.config.AppConfig.AppConfigurationsTest
import org.knora.webapi.config.AppConfigForTestContainers
Expand Down Expand Up @@ -48,12 +49,14 @@ import org.knora.webapi.slice.ontology.api.service.RestCardinalityService
import org.knora.webapi.slice.ontology.api.service.RestCardinalityServiceLive
import org.knora.webapi.slice.ontology.domain.service.CardinalityService
import org.knora.webapi.slice.ontology.domain.service.OntologyRepo
import org.knora.webapi.slice.ontology.domain.service.OntologyService
import org.knora.webapi.slice.ontology.domain.service.OntologyServiceLive
import org.knora.webapi.slice.ontology.repo.service.OntologyCache
import org.knora.webapi.slice.ontology.repo.service.OntologyCacheLive
import org.knora.webapi.slice.ontology.repo.service.OntologyRepoLive
import org.knora.webapi.slice.ontology.repo.service.PredicateRepositoryLive
import org.knora.webapi.slice.resourceinfo.ResourceInfoLayers
import org.knora.webapi.slice.resourceinfo.api.ResourceInfoEndpoints
import org.knora.webapi.slice.resourceinfo.domain.IriConverter
import org.knora.webapi.slice.resources.repo.service.ResourcesRepo
import org.knora.webapi.slice.resources.repo.service.ResourcesRepoLive
Expand Down Expand Up @@ -90,61 +93,67 @@ object LayersTest {
AdminApiEndpoints &
AdminModule.Provided &
ApiRoutes &
ApiV2Endpoints &
AppRouter &
AssetPermissionsResponder &
Authenticator &
AuthorizationRestService &
CardinalityHandler &
CardinalityService &
ConstructResponseUtilV2 &
DspIngestClient &
GravsearchTypeInspectionRunner &
GroupRestService &
HttpServer &
IIIFRequestMessageHandler &
InferenceOptimizationService &
InvalidTokenCache &
IriConverter &
IriService &
JwtService &
ListsResponder &
ListsResponderV2 &
MessageRelay &
OntologyCache &
OntologyHelpers &
OntologyInferencer &
OntologyRepo &
OntologyResponderV2 &
OntologyService &
PermissionRestService &
PermissionUtilADM &
PermissionsResponder &
ProjectExportService &
ProjectExportStorageService &
ProjectImportService &
ProjectRestService &
RepositoryUpdater &
ResourceInfoEndpoints &
ResourceUtilV2 &
ResourcesRepo &
ResourcesResponderV2 &
RestCardinalityService &
SearchApiRoutes &
SearchEndpoints &
SearchResponderV2Module.Provided &
SipiService &
StandoffResponderV2 &
StandoffTagUtilV2 &
State &
StringFormatter &
TestClientService &
TriplestoreService &
UserRestService &
ValuesResponderV2

type CommonR1 =
ApiV2Endpoints &
HttpServer &
IIIFRequestMessageHandler &
ListsResponderV2 &
OntologyResponderV2 &
RepositoryUpdater &
ResourcesResponderV2 &
StandoffResponderV2
// format: on

private val commonLayersForAllIntegrationTests =
private val commonLayersForAllIntegrationTests1 =
ZLayer.makeSome[CommonR0, CommonR](
AdminApiModule.layer,
AdminModule.layer,
ApiRoutes.layer,
ApiV2Endpoints.layer,
AppRouter.layer,
AssetPermissionsResponder.layer,
AuthenticatorLive.layer,
Expand All @@ -155,23 +164,19 @@ object LayersTest {
ConstructResponseUtilV2Live.layer,
DspIngestClientLive.layer,
HandlerMapper.layer,
HttpServer.layer,
IIIFRequestMessageHandlerLive.layer,
InferenceOptimizationService.layer,
InvalidTokenCache.layer,
IriConverter.layer,
IriService.layer,
JwtServiceLive.layer,
KnoraResponseRenderer.layer,
ListsResponder.layer,
ListsResponderV2.layer,
ManagementEndpoints.layer,
ManagementRoutes.layer,
MessageRelayLive.layer,
OntologyCacheLive.layer,
OntologyHelpersLive.layer,
OntologyRepoLive.layer,
OntologyResponderV2Live.layer,
OntologyServiceLive.layer,
PermissionUtilADMLive.layer,
PermissionsResponder.layer,
Expand All @@ -180,17 +185,14 @@ object LayersTest {
ProjectExportServiceLive.layer,
ProjectExportStorageServiceLive.layer,
ProjectImportService.layer,
RepositoryUpdater.layer,
ResourceInfoLayers.live,
ResourceUtilV2Live.layer,
ResourcesRepoLive.layer,
ResourcesResponderV2.layer,
RestCardinalityServiceLive.layer,
SearchApiRoutes.layer,
SearchEndpoints.layer,
SearchResponderV2Module.layer,
SipiServiceTestDelegator.layer,
StandoffResponderV2.layer,
StandoffTagUtilV2Live.layer,
State.layer,
StringFormatter.live,
Expand All @@ -200,6 +202,28 @@ object LayersTest {
ValuesResponderV2Live.layer,
)

private val commonLayersForAllIntegrationTests2 =
ZLayer.makeSome[
CommonR0 & CommonR,
CommonR1,
](
OntologyResponderV2Live.layer,
StandoffResponderV2.layer,
ResourcesResponderV2.layer,
RepositoryUpdater.layer,
HttpServer.layer,
ApiV2Endpoints.layer,
ListsResponderV2.layer,
IIIFRequestMessageHandlerLive.layer,
IriService.layer,
)

private val commonLayersForAllIntegrationTests =
ZLayer.makeSome[CommonR0, CommonR & CommonR1](
commonLayersForAllIntegrationTests1,
commonLayersForAllIntegrationTests2,
)

private val fusekiAndSipiTestcontainers =
ZLayer.make[
AppConfigurations & DspIngestTestContainer & FusekiTestContainer & SharedVolumes.Images & SipiTestContainer &
Expand Down
2 changes: 2 additions & 0 deletions webapi/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,8 @@ app {
password = ${?KNORA_WEBAPI_TRIPLESTORE_FUSEKI_PASSWORD}
query-logging-threshold = 1s
query-logging-threshold = ${?KNORA_WEBAPI_TRIPLESTORE_FUSEKI_QUERY_LOGGING_THRESHOLD}
allow-compaction = false
allow-compaction = ${?KNORA_WEBAPI_TRIPLESTORE_FUSEKI_ALLOW_COMPACTION}
}

// If true, the time taken by each SPARQL query is logged at DEBUG level. To see these messages,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ final case class Fuseki(
username: String,
password: String,
queryLoggingThreshold: Duration = Duration.ofMillis(1000),
allowCompaction: Boolean = false,
)

final case class InstrumentationServerConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ import zio.json.JsonCodec
import org.knora.webapi.core.State
import org.knora.webapi.core.domain.AppState
import org.knora.webapi.http.version.BuildInfo
import org.knora.webapi.slice.common.api.AuthorizationRestService
import org.knora.webapi.slice.common.api.BaseEndpoints
import org.knora.webapi.slice.common.api.HandlerMapper
import org.knora.webapi.slice.common.api.PublicEndpointHandler
import org.knora.webapi.slice.common.api.SecuredEndpointHandler
import org.knora.webapi.slice.common.api.TapirToPekkoInterpreter
import org.knora.webapi.store.triplestore.api.TriplestoreService

final case class VersionResponse(
webapi: String,
Expand Down Expand Up @@ -85,6 +88,11 @@ final case class ManagementEndpoints(baseEndpoints: BaseEndpoints) {
.out(jsonBody[HealthResponse])
.out(statusCode)

private[infrastructure] val postStartCompaction = baseEndpoints.securedEndpoint.post
.in("start-compaction")
.out(jsonBody[String])
.out(statusCode)

val endpoints: Seq[AnyEndpoint] = List(getVersion, getHealth).map(_.tag("Management"))
}

Expand All @@ -97,6 +105,8 @@ final case class ManagementRoutes(
state: State,
mapper: HandlerMapper,
tapirToPekko: TapirToPekkoInterpreter,
triplestore: TriplestoreService,
auth: AuthorizationRestService,
) {

private val versionEndpointHandler =
Expand All @@ -111,8 +121,24 @@ final case class ManagementRoutes(
(response, if (response.status) StatusCode.Ok else StatusCode.ServiceUnavailable)
}

val routes = List(versionEndpointHandler, healthEndpointHandler)
.map(mapper.mapPublicEndpointHandler(_))
private val startCompactionHandler =
SecuredEndpointHandler[Unit, (String, StatusCode)](
endpoint.postStartCompaction,
user =>
_ =>
for {
_ <- auth.ensureSystemAdmin(user)
success <- triplestore.compact()
} yield if (success) ("ok", StatusCode.Ok) else ("forbidden", StatusCode.Forbidden),
)

val routes = (
List(versionEndpointHandler, healthEndpointHandler)
.map(mapper.mapPublicEndpointHandler(_))
++
List(startCompactionHandler)
.map(mapper.mapSecuredEndpointHandler(_))
)
.map(tapirToPekko.toRoute)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ trait TriplestoreService {
def uploadRepository(inputFile: Path): Task[Unit]

def dropGraph(graphName: String): Task[Unit]

def compact(): Task[Boolean]
}

object TriplestoreService {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@ trait FusekiTriplestore {

case class FusekiPaths(config: Fuseki) {
val checkServer: List[String] = List("$", "server")
val repository: List[String] = List(config.repositoryName)
val data: List[String] = repository :+ "data"
val get: List[String] = repository :+ "get"
val query: List[String] = repository :+ "query"
val update: List[String] = repository :+ "update"
val datasets: List[String] = List("$", "datasets")
val tasks: List[String] = List("$", "tasks")
val compact: List[String] = List("$", "compact", config.repositoryName)

val repository: List[String] = List(config.repositoryName)
val data: List[String] = repository :+ "data"
val get: List[String] = repository :+ "get"
val query: List[String] = repository :+ "query"
val update: List[String] = repository :+ "update"
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import sttp.client3.httpclient.zio.HttpClientZioBackend
import zio.*
import zio.json.*
import zio.json.ast.Json
import zio.json.ast.JsonCursor
import zio.metrics.Metric
import zio.nio.file.Path as NioPath

Expand Down Expand Up @@ -297,6 +298,36 @@ case class TriplestoreServiceLive(
fusekiServer <- ZIO.fromEither(response.body.toOption.getOrElse("").fromJson[FusekiServer]).mapError(Throwable(_))
} yield fusekiServer.datasets.find(ds => ds.dsName == s"/${fusekiConfig.repositoryName}" && ds.dsState).nonEmpty

override def compact(): Task[Boolean] =
ZIO
.when(fusekiConfig.allowCompaction) {
for {
_ <- ZIO.logInfo("Starting compaction")
request = authenticatedRequest.post(targetHostUri.addPath(paths.compact).addParam("deleteOld", "true"))
body <- doHttpRequest(request)
json <- ZIO.fromEither(body.body.merge.fromJson[Json]).mapError(new RuntimeException(_))
cursor = JsonCursor.field("taskId").isString
taskId <- ZIO.fromEither(json.get(cursor)).mapBoth(new RuntimeException(_), _.value)
_ <- ZIO.logInfo(s"Awaiting compaction task: $taskId")
_ <- awaitOnce(isFinished(taskId)).repeatWhileEquals(false)
_ <- ZIO.logInfo("Compaction finished.")
} yield ()
}
.map(_.isDefined)

private def awaitOnce[A](z: UIO[A]): UIO[Boolean] =
z.flatMap {
case true => ZIO.succeed(true)
case false => ZIO.sleep(20.seconds).as(false)
}

private def isFinished(taskId: String): UIO[Boolean] =
(for {
_ <- ZIO.logDebug(s"Checking compaction task: $taskId")
body <- doHttpRequest(authenticatedRequest.get(targetHostUri.addPath(paths.tasks).addPath(taskId)))
json <- ZIO.fromEither(body.body.merge.fromJson[Json]).mapError(new RuntimeException(_))
} yield json.get(JsonCursor.field("finished").isString).isRight).orDie

/**
* Initialize the Jena Fuseki triplestore. Currently only works for
* 'knora-test' and 'knora-test-unit' repository names. To be used, the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ final case class TriplestoreServiceInMemory(datasetRef: Ref[Dataset])(implicit v
override def dropGraph(graphName: IRI): Task[Unit] =
notImplemented

override def compact(): Task[Boolean] = ZIO.succeed(false)
}

object TriplestoreServiceInMemory {
Expand Down

0 comments on commit d5576b5

Please sign in to comment.