diff --git a/elastic/testkit/src/test/scala/EvolverSpec.scala b/elastic/testkit/src/test/scala/EvolverSpec.scala index f19b5d9..3c6f1e3 100644 --- a/elastic/testkit/src/test/scala/EvolverSpec.scala +++ b/elastic/testkit/src/test/scala/EvolverSpec.scala @@ -4,13 +4,14 @@ import akka.actor.typed.scaladsl.adapter.TypedActorSystemOps import com.sksamuel.elastic4s.ElasticClient import com.sksamuel.elastic4s.ElasticDsl.{RichFuture => _, _} import com.sksamuel.elastic4s.akka.{AkkaHttpClient, AkkaHttpClientSettings} +import com.sksamuel.elastic4s.circe._ import com.sksamuel.elastic4s.fields.{ElasticField, KeywordField, TextField} import com.sksamuel.elastic4s.handlers.index.Field import com.softwaremill.macwire.wireSet -import io.circe.{Codec, Json} import io.circe.generic.extras.semiauto._ -import io.circe.syntax.EncoderOps import io.circe.parser._ +import io.circe.syntax.EncoderOps +import io.circe.{Codec, Json} import net.sc8s.akka.components.testkit.ClusterComponentTestKit import net.sc8s.elastic.Evolver.Command.{EvolveDocuments, MigrateIndices, RunBatchUpdates} import net.sc8s.elastic.Index.BatchUpdate @@ -19,12 +20,12 @@ import net.sc8s.logstage.elastic.Logging import org.scalatest.Inspectors._ import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike -import com.sksamuel.elastic4s.circe._ import java.time.LocalDateTime import java.time.temporal.ChronoUnit import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future +import scala.util.chaining.scalaUtilChainingOps class EvolverSpec extends ScalaTestWithActorTestKit(Evolver.serializers) with AnyWordSpecLike with Matchers with Logging with ClusterComponentTestKit { @@ -33,7 +34,7 @@ class EvolverSpec extends ScalaTestWithActorTestKit(Evolver.serializers) with An implicit lazy val indexSetup: IndexSetup = IndexSetup( elasticClient, system, - None, + Some("evolver"), refreshImmediately = true ) @@ -44,12 +45,14 @@ class EvolverSpec extends ScalaTestWithActorTestKit(Evolver.serializers) with An val elasticIndices: Set[Index] = wireSet + def aliases() = + elasticClient.execute(catAliases()).futureValue.result.filter(_.alias.pipe(elasticIndices.map(_.name).contains)) + "Evolver" should { "create indices, mappings and aliases" in new EvolverContext { eventually { - val response = elasticClient.execute(catAliases()).futureValue - response.result.map(_.alias) should contain theSameElementsAs elasticIndices.map(_.name) - forAll(response.result.map(_.index)) { index => + aliases().map(_.alias) should contain theSameElementsAs elasticIndices.map(_.name) + forAll(aliases().map(_.index)) { index => val _index :: dateTime :: Nil = index.split("-", 2).toList elasticIndices.map(_.name) should contain(_index) @@ -63,17 +66,13 @@ class EvolverSpec extends ScalaTestWithActorTestKit(Evolver.serializers) with An ) } "keep existing aliases and indices" in new EvolverContext { - val existingAliases = eventually { - val response = elasticClient.execute(catAliases()).futureValue - response.result should have length elasticIndices.size - response.result - } + val existingAliases = aliases() - evolver ! MigrateIndices(elasticIndices.map(_.name).toSeq, forceReindex = false) + evolver ! MigrateIndices(Nil, forceReindex = false) Thread.sleep(3000) - elasticClient.execute(catAliases()).futureValue.result shouldBe existingAliases + aliases() shouldBe existingAliases } def addDocumentV2() = { @@ -131,7 +130,6 @@ class EvolverSpec extends ScalaTestWithActorTestKit(Evolver.serializers) with An // } //} "skip migration of mappings" in new EvolverContext(testIndex1) { - eventually(elasticClient.execute(catAliases()).futureValue.result should not be empty) elasticClient.execute(putMapping(testIndex1.name) properties Seq(KeywordField("deleted"))).futureValue evolver ! MigrateIndices(Nil, forceReindex = false) @@ -140,8 +138,6 @@ class EvolverSpec extends ScalaTestWithActorTestKit(Evolver.serializers) with An elasticClient.execute(getIndex(testIndex1.name)).futureValue.result.head._2.mappings.properties === Map("name" -> Field(Some("keyword")), "deleted" -> Field(Some("keyword"))) } "evolve documents with older version" in new EvolverContext(indexV1) { - eventually(elasticClient.execute(catAliases()).futureValue.result should not be empty) - val indexV2 = new EvolverSpec.IndexV2() val document1_V1 = indexV1.DocumentV1("id1", "first1", "last1", Some(1)) @@ -174,9 +170,6 @@ class EvolverSpec extends ScalaTestWithActorTestKit(Evolver.serializers) with An )) } "run batch updates" in new EvolverContext(indexV1) { - evolver ! MigrateIndices(Nil, forceReindex = false) - eventually(elasticClient.execute(catAliases()).futureValue.result should not be empty) - import indexV1.codec val document = indexV1.DocumentV1("id1", "first1", "last1", Some(1)) @@ -191,22 +184,27 @@ class EvolverSpec extends ScalaTestWithActorTestKit(Evolver.serializers) with An } } - private def deleteAllIndicesAndAliases() = + private def deleteAllIndicesAndAliases() = { elasticClient .execute(catAliases()).flatMap(response => Future.sequence( - response.result.map(aliasResponse => elasticClient.execute(removeAlias(aliasResponse.alias, aliasResponse.index))) + response.result.filter(_.alias.pipe(elasticIndices.map(_.name).contains)).map(aliasResponse => elasticClient.execute(removeAlias(aliasResponse.alias, aliasResponse.index))) )) .flatMap(_ => - elasticClient.execute(catIndices()).flatMap(response => if (response.result.nonEmpty) elasticClient.execute(deleteIndex(response.result.map(_.index): _*)) else Future.successful()) + elasticClient.execute(catIndices()).flatMap(response => if (response.result.nonEmpty) elasticClient.execute(deleteIndex(response.result.filter(_.index.pipe(index => elasticIndices.map(_.name).exists(index.startsWith))).map(_.index): _*)) else Future.successful()) ) .futureValue + } class EvolverContext(indices: Index*) { + private def allOrCustomIndices(indices: Seq[Index]) = if (indices.isEmpty) elasticIndices else indices.toSet deleteAllIndicesAndAliases() - def spawnEvolver(indices: Index*) = spawnComponent(Evolver)(new Evolver.Component(elasticClient, if (indices.isEmpty) elasticIndices else indices.toSet)) + def spawnEvolver(indices: Index*) = spawnComponent(Evolver)(new Evolver.Component(elasticClient, allOrCustomIndices(indices))) val evolver = spawnEvolver(indices: _ *) + + // wait for automatic migration to finish + eventually(aliases().map(_.alias) should contain theSameElementsAs allOrCustomIndices(indices).map(_.name)) } }