Skip to content

Commit

Permalink
fix bug in compactor
Browse files Browse the repository at this point in the history
  • Loading branch information
oalam committed Feb 3, 2021
1 parent b25b168 commit e3754b2
Showing 1 changed file with 108 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.mutable

class Compactor(val options: CompactorConf) extends Serializable with Runnable {

private val logger = LogManager.getLogger(classOf[Compactor])
@transient lazy val logger = LogManager.getLogger(classOf[Compactor])
private var solrClient: CloudSolrClient = null
private var spark: SparkSession = null
private var scheduledThreadPool: ScheduledExecutorService = null
Expand All @@ -50,8 +50,8 @@ class Compactor(val options: CompactorConf) extends Serializable with Runnable {


/**
* Start looping running the compaction algorithm
*/
* Start looping running the compaction algorithm
*/
// Need only one thread running the compaction
scheduledThreadPool = Executors.newScheduledThreadPool(1)
// Will the first algorithm run now of after a delay?
Expand Down Expand Up @@ -111,78 +111,71 @@ class Compactor(val options: CompactorConf) extends Serializable with Runnable {
def doCompact() = {
val days = findDaysToCompact()
days
// .filter(d => d == "2020-08-29")
.foreach(day => {
println(day)
logger.info(s"start processing day $day")
val uncompactedChunks = loadChunksFromSolr(day)
val ids = getChunkIdList(uncompactedChunks)
val ids = getPreviouslyCompactedChunksIdList(uncompactedChunks)

val measuresDS = convertChunksToMeasures(uncompactedChunks)
val compactedChunksDS = convertMeasuresToChunks(measuresDS)

// uncompactedChunks.show(50, false)
// measuresDS.show(50, false)
// compactedChunksDS.show(50, false)


writeCompactedChunksToSolr(compactedChunksDS)
deleteOldChunks( ids)
// printChunks(day)


/*
curl http://localhost:8983/solr/historian/update -H "Content-type: text/xml" --data-binary '<delete><query>chunk_origin:compactor*it*</query></delete>'
curl http://localhost:8983/solr/historian/update -H "Content-type: text/xml" --data-binary '<commit />'
*/
deleteOldChunks(day)
deleteOldChunksByIds(ids)
logger.info(s"done processing day $day")
})
}

def getChunkIdList(chunks:Dataset[Chunk]) : List[(String,String)] = {
def getPreviouslyCompactedChunksIdList(chunks: Dataset[Chunk]) = {

import chunks.sparkSession.implicits._

chunks.select(FIELD_ID, FIELD_ORIGIN)
.map(r => ( r.getAs[String](FIELD_ID), r.getAs[String](FIELD_ORIGIN)))
val origin = options.chunkyfier.origin
chunks.select(FIELD_ID,FIELD_ORIGIN)
.filter( r => r.getAs[String](FIELD_ORIGIN) == origin )
.map(r => r.getAs[String](FIELD_ID))
.collect()
.toList
}

/**
* Get old documents whose chunk origin is not compactor, starting from yesterday
*
* Prepare query that gets documents (and operator):
* - from epoch since yesterday (included)
* - with origin not from compactor (chunks not already compacted and thus needing to be)
* Return only interesting fields:
* - metric_key
* - chunk_day
* Documentation for query parameters of spark-solr: https://github.com/lucidworks/spark-solr#query-parameters
* Example:
*
* chunk_start:[* TO 1600387199999]
* AND -chunk_origin:compactor
* fields metric_key,chunk_day
* rows 1000
* request_handler /export
*
* @return the list of days formatted as a string like "yyyy-MM-dd"
*/
* Get old documents whose chunk origin is not compactor, starting from yesterday
*
* Prepare query that gets documents (and operator):
* - from epoch since yesterday (included)
* - with origin not from compactor (chunks not already compacted and thus needing to be)
* Return only interesting fields:
* - metric_key
* - chunk_day
* Documentation for query parameters of spark-solr: https://github.com/lucidworks/spark-solr#query-parameters
* Example:
*
* chunk_start:[* TO 1600387199999]
* AND -chunk_origin:compactor
* fields metric_key,chunk_day
* rows 1000
* request_handler /export
*
* @return the list of days formatted as a string like "yyyy-MM-dd"
*/
def findDaysToCompact() = {
val days = mutable.HashSet[String]()

// build SolR facet query
val solrQuery = new SolrQuery("*:*")
val filterQuery = if (options.reader.queryFilters.isEmpty)
s"$SOLR_COLUMN_START :[* TO ${DateUtil.utcFirstTimestampOfTodayMs - 1L} ]"
s"$SOLR_COLUMN_START:[* TO ${DateUtil.utcFirstTimestampOfTodayMs - 1L}] AND " +
s"-$SOLR_COLUMN_ORIGIN:${options.chunkyfier.origin}"
else
s"$SOLR_COLUMN_START :[* TO ${DateUtil.utcFirstTimestampOfTodayMs - 1L} ] AND (${options.reader.queryFilters})"
s"$SOLR_COLUMN_START:[* TO ${DateUtil.utcFirstTimestampOfTodayMs - 1L}] AND " +
s"-$SOLR_COLUMN_ORIGIN:${options.chunkyfier.origin} AND " +
s"${options.reader.queryFilters}"

solrQuery.addFilterQuery(filterQuery)
solrQuery.setRows(0)
solrQuery.addFacetField(SOLR_COLUMN_DAY)
solrQuery.setFacet(true)
solrQuery.setFacetMinCount(1)
logger.debug(s"looking for days to compact : fq=$filterQuery")
logger.info(s"Solr query looking for days to compact : fq=$filterQuery")

// run that query and convert response to a set of days
val response = solrClient.query(options.solr.collectionName, solrQuery)
Expand All @@ -192,25 +185,25 @@ class Compactor(val options: CompactorConf) extends Serializable with Runnable {
}

if (days.isEmpty)
logger.debug("no chunk found for compaction")
logger.info("no chunk found for compaction")
else
logger.debug("found " + days.toList.mkString(",") + " to be compacted")
logger.info("found " + days.toList.mkString(",") + " to be compacted")
days
}

/**
* get all chunks from the day
*
* @TODO can be optimized to remove all those that does not need any compaction
* @return the chunks dataset
*/
* get all chunks from the day
*
* @TODO can be optimized to remove all those that does not need any compaction
* @return the chunks dataset
*/
def loadChunksFromSolr(day: String) = {
logger.debug(s"start loading chunks from SolR collection : ${options.solr.collectionName} for day $day")
logger.info(s"start loading chunks from SolR collection : ${options.solr.collectionName} for day $day")

val filterQuery = if (options.reader.queryFilters.isEmpty)
s"$SOLR_COLUMN_DAY:$day"
else
s"$SOLR_COLUMN_DAY:$day AND (${options.reader.queryFilters})"
s"$SOLR_COLUMN_DAY:$day AND ${options.reader.queryFilters}"

ReaderFactory.getChunksReader(ReaderType.SOLR)
.read(sql.Options(
Expand All @@ -225,22 +218,22 @@ class Compactor(val options: CompactorConf) extends Serializable with Runnable {
}

/**
* convert a dataset of chunks into a dataset of measures
*
* @param chunksDS
* @return the dataset of measures
*/
* convert a dataset of chunks into a dataset of measures
*
* @param chunksDS
* @return the dataset of measures
*/
def convertChunksToMeasures(chunksDS: Dataset[Chunk]): Dataset[Measure] = {
new UnChunkyfier().transform(chunksDS)
.as[Measure](Encoders.bean(classOf[Measure]))
}

/**
* transform Measures into Chunks
*
* @param measuresDS inpout dataset
* @return chunk dataset
*/
* transform Measures into Chunks
*
* @param measuresDS inpout dataset
* @return chunk dataset
*/
def convertMeasuresToChunks(measuresDS: Dataset[Measure]): Dataset[Chunk] = {
new Chunkyfier()
.setOrigin(options.chunkyfier.origin)
Expand All @@ -253,10 +246,10 @@ class Compactor(val options: CompactorConf) extends Serializable with Runnable {
}

/**
* save the new new chunks to SolR
*
* @param chunksDS
*/
* save the new new chunks to SolR
*
* @param chunksDS
*/
def writeCompactedChunksToSolr(chunksDS: Dataset[Chunk]) = {
// write chunks to SolR
WriterFactory.getChunksWriter(WriterType.SOLR)
Expand All @@ -268,16 +261,16 @@ class Compactor(val options: CompactorConf) extends Serializable with Runnable {

// explicit commit to make sure all docs are immediately visible
val response = solrClient.commit(options.solr.collectionName)
logger.debug(s"done saving new chunks to collection ${options.solr.collectionName}")
logger.info(s"done saving new chunks to collection ${options.solr.collectionName}")

chunksDS
}


/**
*
* @return
*/
*
* @return
*/
def checkChunksIntegrity(chunksDS: Dataset[Chunk]) = {
val q = new SolrQuery("*:*")
val response = solrClient.query(options.solr.collectionName, q)
Expand Down Expand Up @@ -313,23 +306,20 @@ class Compactor(val options: CompactorConf) extends Serializable with Runnable {
import collection.JavaConverters._

/**
* remove all previous chunks
*
* @return
*/
def deleteOldChunks(ids:List[(String,String)]) = {


// println(s"-----------delete ---------")
for (id <- ids) {
logger.debug("Deleting document id " + id)
// println(s"id=$id")
try solrClient.deleteById(options.solr.collectionName, id._1)
catch {
case e: Exception =>
logger.error("Error deleting chunk document with id " + id._1 + ": " + e.getMessage)
}
}
* remove all previous chunks
*
* @return
*/
def deleteOldChunks(day: String /*ids:List[(String,String)]*/) = {

// build SolR facet query
val query = if (options.reader.queryFilters.isEmpty)
s"$SOLR_COLUMN_DAY:$day AND -$SOLR_COLUMN_ORIGIN:${options.chunkyfier.origin}"
else
s"$SOLR_COLUMN_DAY:$day AND -$SOLR_COLUMN_ORIGIN:${options.chunkyfier.origin} AND ${options.reader.queryFilters}"

logger.info(s"will delete by query the following matching chunks : $query")
solrClient.deleteByQuery(options.solr.collectionName, query)

try {
logger.debug("Committing documents deletion")
Expand All @@ -338,7 +328,31 @@ class Compactor(val options: CompactorConf) extends Serializable with Runnable {
case e: Exception =>
logger.error("Error committing deleted chunks: " + e.getMessage)
}
logger.info("Old chunks successfully deleted")
}


def deleteOldChunksByIds(ids: List[String]) = {

// logger.info(s"start deletion of old chunks from origin ${options.chunkyfier.origin}")
ids.foreach( id => {
logger.debug("Deleting document id " + id)
try solrClient.deleteById(options.solr.collectionName, id)
catch {
case e: Exception =>
logger.error("Error deleting chunk document with id " + id + ": " + e.getMessage)
}
})


try {
logger.debug("Committing documents deletion")
solrClient.commit(options.solr.collectionName, true, true)
} catch {
case e: Exception =>
logger.error("Error committing deleted chunks: " + e.getMessage)
}
logger.info("Old chunks successfully deleted")
}

}
Expand All @@ -347,11 +361,11 @@ object Compactor {


/**
*
* $SPARK_HOME/bin/spark-submit --driver-memory 4g --driver-java-options '-Dlog4j.configuration=file:historian-resources/conf/log4j.properties' --class com.hurence.historian.spark.compactor.Compactor --jars historian-resources/jars/spark-solr-3.6.6-shaded.jar,historian-spark/target/historian-spark-1.3.6.jar historian-spark/target/historian-spark-1.3.6.jar --config-file historian-resources/conf/compactor-config.yaml
*
* @param args
*/
*
* $SPARK_HOME/bin/spark-submit --driver-memory 4g --driver-java-options '-Dlog4j.configuration=file:historian-resources/conf/log4j.properties' --class com.hurence.historian.spark.compactor.Compactor --jars historian-resources/jars/spark-solr-3.6.6-shaded.jar,historian-spark/target/historian-spark-1.3.6.jar historian-spark/target/historian-spark-1.3.6.jar --config-file historian-resources/conf/compactor-config.yaml
*
* @param args
*/
def main(args: Array[String]): Unit = {
// get arguments & setup spark session
val options = if (args.size == 0)
Expand Down

0 comments on commit e3754b2

Please sign in to comment.