Skip to content

Commit

Permalink
Merge pull request #287 from an-tex/add-evolver-spec
Browse files Browse the repository at this point in the history
add EvolverSpec
  • Loading branch information
an-tex authored Nov 15, 2023
2 parents bfde12d + acf954c commit 1946aa8
Show file tree
Hide file tree
Showing 5 changed files with 388 additions and 349 deletions.
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -366,10 +366,12 @@ lazy val `elastic-testkit` = (project in file("elastic/testkit"))
circe.generic.value,
circe.genericExtras.value,
macwire.macros,
logback.classic % Test,
logback.core % Test,
),
idePackagePrefix := Some("net.sc8s.elastic.testkit")
)
.dependsOn(`elastic-core`, `schevo-circe`.jvm)
.dependsOn(`elastic-core`, `schevo-circe`.jvm, `akka-components-testkit` % Test)

lazy val `elastic-lagom-api` =
crossProject(JSPlatform, JVMPlatform)
Expand Down
2 changes: 1 addition & 1 deletion elastic/core/src/main/scala/Evolver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ object Evolver extends ClusterComponent.Singleton {
}
}

private val mappingsHashField = "mappingHash"
private[elastic] val mappingsHashField = "mappingHash"

private val analysisHashField = "analysisHash"

Expand Down
349 changes: 2 additions & 347 deletions elastic/core/src/test/scala/net/sc8s/elastic/EvolverSpec.scala
Original file line number Diff line number Diff line change
@@ -1,348 +1,3 @@
//package net.sc8s.elastic
package net.sc8s.elastic

// TODO disabled for now as it sometimes fails in gitlab without any error, issue with mixing scalatest and specs? prolly should be moved into separate repo anyway...
/*
import java.time.LocalDateTime
import java.time.temporal.ChronoUnit
import akka.actor.ActorSystem
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.scaladsl.adapter._
import com.sksamuel.elastic4s.ElasticClient
import com.sksamuel.elastic4s.ElasticDsl.{RichFuture => _, _}
import com.sksamuel.elastic4s.akka.{AkkaHttpClient, AkkaHttpClientSettings}
import com.sksamuel.elastic4s.circe._
import com.sksamuel.elastic4s.requests.indexes.Field
import com.sksamuel.elastic4s.requests.mappings.{FieldDefinition, KeywordField, TextField}
import com.typesafe.config.ConfigFactory
import io.circe.generic.extras.semiauto._
import io.circe.parser._
import io.circe.syntax._
import io.circe.{Decoder, Encoder, Json}
import mu.moin.elastic.evolution.Evolver.{EvolveDocuments, MigrateIndices, RunBatchUpdates}
import mu.moin.elastic.evolution.EvolverSpec._
import mu.moin.elastic.evolution.Index.BatchUpdate
import mu.moin.schemaevolution.Schema
import org.scalatest.Inspectors._
import org.scalatest.concurrent.Eventually
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
class EvolverSpec extends AkkaTypedSpec with Matchers with Eventually {
private val elasticClient = ElasticClient(AkkaHttpClient(AkkaHttpClientSettings(Seq(sys.env.getOrElse("ELASTIC_URL", "localhost:9210")))))
private val testIndices = Seq(TestIndex1, TestIndex2)
"Evolver" should {
"create indices, mappings and aliases" in new EvolverContext {
evolver ! MigrateIndices(testIndices.map(_.name), forceReindex = false)
eventually {
val response = elasticClient.execute(catAliases()).awaitResult
response.result.map(_.alias) should contain theSameElementsAs testIndices.map(_.name)
forAll(response.result.map(_.index)) { index =>
val _index :: dateTime :: Nil = index.split("-", 2).toList
testIndices.map(_.name) should contain(_index)
LocalDateTime.parse(dateTime, Index.indexNameSuffixFormatter).until(LocalDateTime.now, ChronoUnit.SECONDS) should be <= 10L
}
}
elasticClient.execute(getIndex(TestIndex1.name)).awaitResult.result.head._2.mappings.properties shouldBe Map(
"name" -> Field(Some("keyword")),
"class" -> Field(Some("keyword"))
)
}
"keep existing aliases and indices" in new EvolverContext {
evolver ! MigrateIndices(testIndices.map(_.name), forceReindex = false)
val existingAliases = eventually {
val response = elasticClient.execute(catAliases()).awaitResult
response.result should have length testIndices.length
response.result
}
evolver ! MigrateIndices(testIndices.map(_.name), forceReindex = false)
process()
elasticClient.execute(catAliases()).awaitResult.result shouldBe existingAliases
}
def addDocumentV2() = {
elasticClient.execute(indexInto(TestIndex1.name) source TestIndex1.DocumentV2("name1").asJson {
import Index.configuration
deriveConfiguredEncoder[EvolverSpec.TestIndex1.DocumentV2]
}).awaitResult
eventually(elasticClient.execute(count(TestIndex1.name)).awaitResult.result.count shouldBe 1)
}
"migrate to updated mapping by reindexing" in new EvolverContext(TestIndex1) {
evolver ! MigrateIndices(Seq(TestIndex1.name), forceReindex = false)
val originalIndex = eventually {
val aliases = elasticClient.execute(catAliases()).awaitResult.result.find(_.alias == TestIndex1.name)
aliases should not be empty
aliases.get.index
}
addDocumentV2()
spawnEvolver(TestIndex1MappingUpdated) ! MigrateIndices(Seq(TestIndex1MappingUpdated.name), forceReindex = false)
eventually {
elasticClient.execute(catAliases()).awaitResult.result.filter(_.alias == TestIndex1.name).map(_.index) should not contain originalIndex
val mappings = elasticClient.execute(getIndex(TestIndex1.name)).awaitResult.result.head._2.mappings
mappings.properties shouldBe Map("name" -> Field(Some("text")), "class" -> Field(Some("keyword")))
mappings.meta.get(Evolver.mappingsHashField) should contain(TestIndex1MappingUpdated.mappingsHash)
elasticClient.execute(count(TestIndex1.name)).awaitResult.result.count shouldBe 1
}
}
"migrate by adding mapping" in new EvolverContext(TestIndex1) {
evolver ! MigrateIndices(Nil, forceReindex = false)
val originalIndex = eventually {
val aliases = elasticClient.execute(catAliases()).awaitResult.result.find(_.alias == TestIndex1.name)
aliases should not be empty
aliases.get.index
}
addDocumentV2()
spawnEvolver(TestIndex1MappingAdded) ! MigrateIndices(Nil, forceReindex = false)
eventually {
elasticClient.execute(catAliases()).awaitResult.result.filter(_.alias == TestIndex1.name).map(_.index) should contain(originalIndex)
val mappings = elasticClient.execute(getIndex(TestIndex1.name)).awaitResult.result.head._2.mappings
mappings.properties shouldBe Map("name" -> Field(Some("keyword")), "added" -> Field(Some("keyword")), "class" -> Field(Some("keyword")))
mappings.meta.get(Evolver.mappingsHashField) should contain(TestIndex1MappingAdded.mappingsHash)
elasticClient.execute(count(TestIndex1.name)).awaitResult.result.count shouldBe 1
}
}
"skip migration of mappings" in new EvolverContext(TestIndex1) {
evolver ! MigrateIndices(Nil, forceReindex = false)
eventually(elasticClient.execute(catAliases()).awaitResult.result should not be empty)
elasticClient.execute(putMapping(TestIndex1.name) fields Seq(KeywordField("deleted"))).awaitResult
evolver ! MigrateIndices(Nil, forceReindex = false)
process()
elasticClient.execute(getIndex(TestIndex1.name)).awaitResult.result.head._2.mappings.properties === Map("name" -> Field(Some("keyword")), "deleted" -> Field(Some("keyword")))
}
"evolve documents with older version" in new EvolverContext(IndexV1) {
evolver ! MigrateIndices(Nil, forceReindex = false)
eventually(elasticClient.execute(catAliases()).awaitResult.result should not be empty)
val document1_V1 = IndexV1.DocumentV1("first1", "last1", Some(1))
val document2_V2 = Json.obj(
"class" -> IndexV2.latestVersion.asJson,
"skip this" -> "otherwise it would fail evolution".asJson
)
val document3_V1 = IndexV1.DocumentV1("first3", "last3", Some(3))
elasticClient.execute(indexInto(IndexV1.name) source document1_V1.latestTrait).awaitResult
elasticClient.execute(indexInto(IndexV1.name) source document2_V2).awaitResult
elasticClient.execute(indexInto(IndexV1.name) source document3_V1.latestTrait).awaitResult
eventually(elasticClient.execute(count(IndexV1.name)).awaitResult.result.count shouldBe 3)
spawnEvolver(IndexV2) ! EvolveDocuments(Nil)
val document1_V2 = IndexV2.DocumentV1(document1_V1.firstName, document1_V1.lastName, None).migrate
val document3_V2 = IndexV2.DocumentV1(document3_V1.firstName, document3_V1.lastName, None).migrate
eventually(elasticClient.execute(search(IndexV2.name)).awaitResult.result.hits.hits.map(_.sourceAsString).map(parse(_).right.get) should contain theSameElementsAs Seq(
document1_V2.latestTrait.asJson,
document2_V2,
document3_V2.latestTrait.asJson
))
}
"run batch updates" in new EvolverContext(IndexV1) {
evolver ! MigrateIndices(Nil, forceReindex = false)
eventually(elasticClient.execute(catAliases()).awaitResult.result should not be empty)
val document = IndexV1.DocumentV1("first1", "last1", Some(1))
elasticClient.execute(indexInto(IndexV1.name) source document.latestTrait).awaitResult
eventually(elasticClient.execute(count(IndexV1.name)).awaitResult.result.count shouldBe 1)
evolver ! RunBatchUpdates(IndexV1.name, "clearAge")
eventually(elasticClient.execute(search(IndexV2.name)).awaitResult.result.hits.hits.map(_.sourceAsString).map(parse(_).right.get) should contain theSameElementsAs Seq(
document.latestTrait.asJson.mapObject(_.remove("age"))
))
}
}
private def deleteAllIndicesAndAliases() =
elasticClient.execute(catAliases()).flatMap(response => Future.sequence(
response.result.map(aliasResponse => elasticClient.execute(removeAlias(aliasResponse.alias, aliasResponse.index)))
)).flatMap(_ =>
elasticClient.execute(catIndices()).flatMap(response => elasticClient.execute(deleteIndex(response.result.map(_.index): _*)))
).awaitResult
//private def cleanAllIndices() = elasticClient.execute(catIndices()).flatMap(response => elasticClient.execute(clearIndex(response.result.map(_.index)))).awaitResult
//
class EvolverContext(indices: Index*) {
def spawnEvolver(indices: Index*) = testKit.spawn(Evolver(elasticClient, if (indices.isEmpty) testIndices else indices))
val evolver = spawnEvolver(indices: _ *)
deleteAllIndicesAndAliases()
}
}
object EvolverSpec {
trait TestIndex1Base extends Index {
override val name = "test_index1"
override val latestVersion = latestVersionHelper[LatestCaseClass]
override val mappings: Seq[FieldDefinition] = Seq(KeywordField("name"))
sealed trait Versioned extends Schema.Versioned {
override type LatestTrait = Document
}
sealed trait Document extends Versioned with Schema.Latest {
override type LatestCaseClass = DocumentV3
val name: String
val deleted: Boolean
}
case class DocumentV3(name: String, deleted: Boolean) extends Document {
override def migrate = this
override def caseClass = this
}
case class DocumentV2(name: String) extends Versioned {
override def migrate = DocumentV3(name, deleted = false).migrate
}
case class DocumentV1(firstName: String, lastName: String) extends Versioned {
override def migrate = DocumentV2(s"$firstName $lastName").migrate
}
override implicit val versionedDecoder: Decoder[Versioned] = deriveConfiguredDecoder
override implicit val latestInterfaceEncoder: Encoder[Document] = deriveConfiguredEncoder
}
object TestIndex1 extends TestIndex1Base
object TestIndex1MappingUpdated extends TestIndex1Base {
override val mappings: Seq[FieldDefinition] = Seq(TextField("name"))
}
object TestIndex1MappingAdded extends TestIndex1Base {
override val mappings = Seq(KeywordField("name"), KeywordField("added"))
}
object TestIndex2 extends Index {
override val name = "test_index2"
override val latestVersion = latestVersionHelper[LatestCaseClass]
case class Document(parameter: String) extends Schema.Versioned with Schema.Latest {
override type LatestTrait = Document
override type LatestCaseClass = Document
override def migrate = this
override def caseClass = this
}
override type Versioned = Document
override implicit val versionedDecoder: Decoder[Versioned] = deriveConfiguredDecoder
override implicit val latestInterfaceEncoder: Encoder[Document] = deriveConfiguredEncoder
}
object IndexV1 extends Index {
override val name = "index"
override val latestVersion = latestVersionHelper[LatestCaseClass]
sealed trait Versioned extends Schema.Versioned {
override type LatestTrait = Document
}
sealed trait Document extends Versioned with Schema.Latest {
val firstName: String
val lastName: String
val age: Option[Int]
override type LatestCaseClass = DocumentV1
}
case class DocumentV1(firstName: String, lastName: String, age: Option[Int] = None) extends Document {
override def migrate: Document = this
override def caseClass = this
}
override val batchUpdates = Seq(
BatchUpdate("clearAge", _.caseClass.copy(age = None).latestTrait)
)
override implicit val versionedDecoder: Decoder[Versioned] = deriveConfiguredDecoder
override implicit val latestInterfaceEncoder: Encoder[Document] = deriveConfiguredEncoder
}
object IndexV2 extends Index {
override val name = IndexV1.name
override val latestVersion = latestVersionHelper[LatestCaseClass]
override val mappings = Seq(KeywordField("name"))
sealed trait Versioned extends Schema.Versioned {
override type LatestTrait = Document
}
sealed trait Document extends Versioned with Schema.Latest {
val firstName: String
val lastName: String
val fullName: String
override type LatestCaseClass = DocumentV2
}
case class DocumentV2(firstName: String, lastName: String, fullName: String) extends Document {
override def migrate = this
override def caseClass = this
}
case class DocumentV1(firstName: String, lastName: String, age: Option[Int] = None) extends Versioned {
override def migrate = DocumentV2(firstName, lastName, s"$firstName $lastName").migrate
}
override implicit val versionedDecoder: Decoder[Versioned] = deriveConfiguredDecoder
override implicit val latestInterfaceEncoder: Encoder[Document] = deriveConfiguredEncoder
}
}
abstract class AkkaTypedSpec extends ScalaTestWithActorTestKit(
// creating own ActorSystem to allow system.actorOf inside behavior (e.g. in elasticClient.publisher)
ActorSystem("scalatest", {
val debug = false
ConfigFactory.parseString(
if (debug)
"""
|akka {
| log-dead-letters = 10
| log-dead-letters-during-shutdown = on
| loglevel = "DEBUG"
|}
|""".stripMargin else "")
.withFallback(ConfigFactory.load())
}).toTyped) with AnyWordSpecLike {
implicit val classicActorSystem: ActorSystem = system.toClassic
implicit val executionContext: ExecutionContext = system.executionContext
def process() = Thread.sleep(3000)
implicit class RichFuture[T](future: Future[T]) {
def awaitResult(implicit duration: Duration = 3.seconds): T = Await.result(future, duration)
}
}
*/
// EvolverSpec resides in testkit now
12 changes: 12 additions & 0 deletions elastic/testkit/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%level %message%n%xException{10}</pattern>
</encoder>
</appender>

<root level="OFF">
<appender-ref ref="STDOUT"/>
</root>

</configuration>
Loading

0 comments on commit 1946aa8

Please sign in to comment.