Skip to content

Commit

Permalink
fix concurrent tests
Browse files Browse the repository at this point in the history
  • Loading branch information
an-tex committed Nov 15, 2023
1 parent d3547f7 commit acf954c
Showing 1 changed file with 22 additions and 24 deletions.
46 changes: 22 additions & 24 deletions elastic/testkit/src/test/scala/EvolverSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import akka.actor.typed.scaladsl.adapter.TypedActorSystemOps
import com.sksamuel.elastic4s.ElasticClient
import com.sksamuel.elastic4s.ElasticDsl.{RichFuture => _, _}
import com.sksamuel.elastic4s.akka.{AkkaHttpClient, AkkaHttpClientSettings}
import com.sksamuel.elastic4s.circe._
import com.sksamuel.elastic4s.fields.{ElasticField, KeywordField, TextField}
import com.sksamuel.elastic4s.handlers.index.Field
import com.softwaremill.macwire.wireSet
import io.circe.{Codec, Json}
import io.circe.generic.extras.semiauto._
import io.circe.syntax.EncoderOps
import io.circe.parser._
import io.circe.syntax.EncoderOps
import io.circe.{Codec, Json}
import net.sc8s.akka.components.testkit.ClusterComponentTestKit
import net.sc8s.elastic.Evolver.Command.{EvolveDocuments, MigrateIndices, RunBatchUpdates}
import net.sc8s.elastic.Index.BatchUpdate
Expand All @@ -19,12 +20,12 @@ import net.sc8s.logstage.elastic.Logging
import org.scalatest.Inspectors._
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike
import com.sksamuel.elastic4s.circe._

import java.time.LocalDateTime
import java.time.temporal.ChronoUnit
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.chaining.scalaUtilChainingOps

class EvolverSpec extends ScalaTestWithActorTestKit(Evolver.serializers) with AnyWordSpecLike with Matchers with Logging with ClusterComponentTestKit {

Expand All @@ -33,7 +34,7 @@ class EvolverSpec extends ScalaTestWithActorTestKit(Evolver.serializers) with An
implicit lazy val indexSetup: IndexSetup = IndexSetup(
elasticClient,
system,
None,
Some("evolver"),
refreshImmediately = true
)

Expand All @@ -44,12 +45,14 @@ class EvolverSpec extends ScalaTestWithActorTestKit(Evolver.serializers) with An

val elasticIndices: Set[Index] = wireSet

def aliases() =
elasticClient.execute(catAliases()).futureValue.result.filter(_.alias.pipe(elasticIndices.map(_.name).contains))

"Evolver" should {
"create indices, mappings and aliases" in new EvolverContext {
eventually {
val response = elasticClient.execute(catAliases()).futureValue
response.result.map(_.alias) should contain theSameElementsAs elasticIndices.map(_.name)
forAll(response.result.map(_.index)) { index =>
aliases().map(_.alias) should contain theSameElementsAs elasticIndices.map(_.name)
forAll(aliases().map(_.index)) { index =>
val _index :: dateTime :: Nil = index.split("-", 2).toList
elasticIndices.map(_.name) should contain(_index)

Expand All @@ -63,17 +66,13 @@ class EvolverSpec extends ScalaTestWithActorTestKit(Evolver.serializers) with An
)
}
"keep existing aliases and indices" in new EvolverContext {
val existingAliases = eventually {
val response = elasticClient.execute(catAliases()).futureValue
response.result should have length elasticIndices.size
response.result
}
val existingAliases = aliases()

evolver ! MigrateIndices(elasticIndices.map(_.name).toSeq, forceReindex = false)
evolver ! MigrateIndices(Nil, forceReindex = false)

Thread.sleep(3000)

elasticClient.execute(catAliases()).futureValue.result shouldBe existingAliases
aliases() shouldBe existingAliases
}

def addDocumentV2() = {
Expand Down Expand Up @@ -131,7 +130,6 @@ class EvolverSpec extends ScalaTestWithActorTestKit(Evolver.serializers) with An
// }
//}
"skip migration of mappings" in new EvolverContext(testIndex1) {
eventually(elasticClient.execute(catAliases()).futureValue.result should not be empty)
elasticClient.execute(putMapping(testIndex1.name) properties Seq(KeywordField("deleted"))).futureValue
evolver ! MigrateIndices(Nil, forceReindex = false)

Expand All @@ -140,8 +138,6 @@ class EvolverSpec extends ScalaTestWithActorTestKit(Evolver.serializers) with An
elasticClient.execute(getIndex(testIndex1.name)).futureValue.result.head._2.mappings.properties === Map("name" -> Field(Some("keyword")), "deleted" -> Field(Some("keyword")))
}
"evolve documents with older version" in new EvolverContext(indexV1) {
eventually(elasticClient.execute(catAliases()).futureValue.result should not be empty)

val indexV2 = new EvolverSpec.IndexV2()

val document1_V1 = indexV1.DocumentV1("id1", "first1", "last1", Some(1))
Expand Down Expand Up @@ -174,9 +170,6 @@ class EvolverSpec extends ScalaTestWithActorTestKit(Evolver.serializers) with An
))
}
"run batch updates" in new EvolverContext(indexV1) {
evolver ! MigrateIndices(Nil, forceReindex = false)
eventually(elasticClient.execute(catAliases()).futureValue.result should not be empty)

import indexV1.codec

val document = indexV1.DocumentV1("id1", "first1", "last1", Some(1))
Expand All @@ -191,22 +184,27 @@ class EvolverSpec extends ScalaTestWithActorTestKit(Evolver.serializers) with An
}
}

private def deleteAllIndicesAndAliases() =
private def deleteAllIndicesAndAliases() = {
elasticClient
.execute(catAliases()).flatMap(response => Future.sequence(
response.result.map(aliasResponse => elasticClient.execute(removeAlias(aliasResponse.alias, aliasResponse.index)))
response.result.filter(_.alias.pipe(elasticIndices.map(_.name).contains)).map(aliasResponse => elasticClient.execute(removeAlias(aliasResponse.alias, aliasResponse.index)))
))
.flatMap(_ =>
elasticClient.execute(catIndices()).flatMap(response => if (response.result.nonEmpty) elasticClient.execute(deleteIndex(response.result.map(_.index): _*)) else Future.successful())
elasticClient.execute(catIndices()).flatMap(response => if (response.result.nonEmpty) elasticClient.execute(deleteIndex(response.result.filter(_.index.pipe(index => elasticIndices.map(_.name).exists(index.startsWith))).map(_.index): _*)) else Future.successful())
)
.futureValue
}

class EvolverContext(indices: Index*) {
private def allOrCustomIndices(indices: Seq[Index]) = if (indices.isEmpty) elasticIndices else indices.toSet
deleteAllIndicesAndAliases()

def spawnEvolver(indices: Index*) = spawnComponent(Evolver)(new Evolver.Component(elasticClient, if (indices.isEmpty) elasticIndices else indices.toSet))
def spawnEvolver(indices: Index*) = spawnComponent(Evolver)(new Evolver.Component(elasticClient, allOrCustomIndices(indices)))

val evolver = spawnEvolver(indices: _ *)

// wait for automatic migration to finish
eventually(aliases().map(_.alias) should contain theSameElementsAs allOrCustomIndices(indices).map(_.name))
}
}

Expand Down

0 comments on commit acf954c

Please sign in to comment.