Skip to content

Commit

Permalink
Updating grpc api to newest version (draft)
Browse files Browse the repository at this point in the history
  • Loading branch information
silvanheller committed Aug 20, 2020
1 parent dbbbc60 commit c5d1c6d
Show file tree
Hide file tree
Showing 2 changed files with 190 additions and 69 deletions.
2 changes: 1 addition & 1 deletion cineast-api/cineast-proto
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
package org.vitrivr.cineast.api.grpc;

import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.vitrivr.cineast.api.grpc.data.QueryStage;
import org.vitrivr.cineast.api.grpc.data.QueryTerm;
import org.vitrivr.cineast.api.grpc.util.MediaObjectUtil;
import org.vitrivr.cineast.api.grpc.util.MediaSegmentUtil;
import org.vitrivr.cineast.api.grpc.util.QueryContainerUtil;
import org.vitrivr.cineast.api.messages.query.StagedSimilarityQuery;
import org.vitrivr.cineast.api.util.QueryUtil;
import org.vitrivr.cineast.core.config.QueryConfig;
import org.vitrivr.cineast.core.config.ReadableQueryConfig;
Expand All @@ -14,6 +22,8 @@
import org.vitrivr.cineast.core.data.entities.MediaObjectMetadataDescriptor;
import org.vitrivr.cineast.core.data.entities.MediaSegmentDescriptor;
import org.vitrivr.cineast.core.data.entities.MediaSegmentMetadataDescriptor;
import org.vitrivr.cineast.core.data.query.containers.QueryContainer;
import org.vitrivr.cineast.core.data.score.SegmentScoreElement;
import org.vitrivr.cineast.core.db.dao.reader.MediaObjectMetadataReader;
import org.vitrivr.cineast.core.db.dao.reader.MediaObjectReader;
import org.vitrivr.cineast.core.db.dao.reader.MediaSegmentMetadataReader;
Expand All @@ -33,6 +43,8 @@ public class CineastQueryService extends CineastQueryGrpc.CineastQueryImplBase {

private final ContinuousRetrievalLogic continuousRetrievalLogic;

private static final Logger LOGGER = LogManager.getLogger();

public CineastQueryService(ContinuousRetrievalLogic continuousRetrievalLogic) {
this.continuousRetrievalLogic = continuousRetrievalLogic;
}
Expand Down Expand Up @@ -114,114 +126,223 @@ public void getMediaSegmentScores(CineastGrpc.Query query, StreamObserver<Cineas
responseObserver.onCompleted();
}

//TODO This has enormous code duplication with the TemporalQueryMessageHandler

@Override
public void getSimilar(CineastGrpc.Query query, StreamObserver<CineastGrpc.QueryResult> responseObserver) {
Set<String> sentSegmentIds = new HashSet<>(), sentObjectIds = new HashSet<>();
public void getSimilar(CineastGrpc.TemporalQuery query, StreamObserver<CineastGrpc.QueryResult> responseObserver) {
StopWatch watch = StopWatch.createStarted();

MediaSegmentReader mediaSegmentReader = new MediaSegmentReader(Config.sharedConfig().getDatabase().getSelectorSupplier().get());
MediaObjectReader mediaObjectReader = new MediaObjectReader(Config.sharedConfig().getDatabase().getSelectorSupplier().get());
MediaSegmentMetadataReader segmentMetadataReader = new MediaSegmentMetadataReader(Config.sharedConfig().getDatabase().getSelectorSupplier().get());
MediaObjectMetadataReader objectMetadataReader = new MediaObjectMetadataReader(Config.sharedConfig().getDatabase().getSelectorSupplier().get());

List<QueryStage> stages = QueryContainerUtil.query(query);
Set<String> sentSegmentIds = new HashSet<>(), sentObjectIds = new HashSet<>();

HashSet<String> relevantSegments = new HashSet<>();
CineastGrpc.QueryConfig config = query.getQueryList().get(0).getConfig();
ReadableQueryConfig rqconf = QueryContainerUtil.queryConfig(config);
QueryConfig qconf = new QueryConfig(rqconf);

stages:
for (int i = 0; i < stages.size(); ++i) {
/* Prepare QueryConfig (so as to obtain a QueryId). */
final String uuid = qconf.getQueryId().toString();
final int max = qconf.getMaxResults().orElse(Config.sharedConfig().getRetriever().getMaxResults());
qconf.setMaxResults(max);
final int resultsPerModule = qconf.getRawResultsPerModule() == -1 ? Config.sharedConfig().getRetriever().getMaxResultsPerModule() : qconf.getResultsPerModule();
qconf.setResultsPerModule(resultsPerModule);

QueryStage stage = stages.get(i);
boolean lastStage = i == stages.size() - 1;
List<Thread> metadataRetrievalThreads = new ArrayList<>();

List<QueryTerm> terms = stage.getQueryTerms();
QueryConfig stageConfig = QueryConfig.clone(stage.getQueryConfig());
stageConfig.addRelevantSegmentIds(relevantSegments);
relevantSegments.clear();
/* We iterate over all components independently, because they have a temporal context.*/
for (int containerIdx = 0; containerIdx < query.getQueryCount(); containerIdx++) {
List<QueryStage> stages = QueryContainerUtil.query(query.getQueryList().get(containerIdx));

for (QueryTerm term : terms) {
/* We make a new stagedQueryConfig per stage because the relevant segments will differ for each stage. This also resets the filter (relevant ids in the config)*/
QueryConfig stageQConf = QueryConfig.clone(qconf);

ReadableQueryConfig queryConfig = stageConfig.withChangesFrom(term.getQueryConfig());
/* For the first stage, there will be no relevant segments when querying. This is ok because the retrieval engine handles this appropriately */
HashSet<String> relevantSegments = new HashSet<>();

for (String category : term.getCategories()) {
List<StringDoublePair> results = QueryUtil.retrieve(continuousRetrievalLogic, term.getContainer(), queryConfig, category);
/* Store for each queryterm per category all results to be sent at a later time */
List<Map<String, List<StringDoublePair>>> cache = new ArrayList<>();

if (!lastStage) {
/* For the terms of a stage, ordering matters. The assumption is that each term is used as a filter for its successor */
for (int stageIndex = 0; stageIndex < stages.size(); stageIndex++) {
/* Initalize stage with this hashmap */
cache.add(stageIndex, new HashMap<>());

if (results.isEmpty()) { //no more results left, later stages can be ignored
break stages;
}
QueryStage stage = stages.get(stageIndex);

results.stream().forEach(x -> relevantSegments.add(x.key));
continue;
}
List<Thread> qtThreads = new ArrayList<>();

/* We now iterate over all QueryTerms for this stage, simply adding their results to the list of relevant segments for the next querystage.
* The list is only updated once we've iterated over all terms
*/
for (int i = 0; i < stage.getQueryTerms().size(); i++) {
QueryTerm qt = stage.getQueryTerms().get(i);

responseObserver.onNext(
QueryContainerUtil.queryResult(
QueryContainerUtil.similarityQueryResult(
term.getQueryConfig().getQueryId().toString(),
final int finalContainerIdx = containerIdx;
final int finalStageIndex = stageIndex;
Thread qtRetrievalThread = new Thread(() -> {

/* Prepare QueryTerm and perform sanity-checks */
if (qt == null) {
/* In rare instances, it is possible to have null as query stage. If this happens to you, please report this to the developers so we can try to fix it. */
LOGGER.warn("QueryTerm was null for stage {}", stage);
return;
}
QueryContainer qc = qt.getContainer();
if (qc == null) {
LOGGER.warn("Likely an empty query, as it could not be converted to a query container. Ignoring it");
return;
}
qc.setContainerId(finalContainerIdx);

List<Thread> categoryThreads = new ArrayList<>();

/* For each category of a specific queryterm, we actually go and retrieve. Be aware that we do not change the relevant ids after this call */
for (String category : qt.getCategories()) {
/* Merge partial results with score-map */
List<SegmentScoreElement> scores = continuousRetrievalLogic.retrieve(qc, category, stageQConf);

/* Transform raw results into list of StringDoublePairs (segmentId -> score) */
final List<StringDoublePair> results = scores.stream()
.map(elem -> new StringDoublePair(elem.getSegmentId(), elem.getScore()))
.filter(p -> p.value > 0d)
.sorted(StringDoublePair.COMPARATOR)
.limit(max)
.collect(Collectors.toList());

if (results.isEmpty()) {
LOGGER.warn("No results found for category {} and qt {} in stage with id {}. Full compoment: {}", category, qt, finalContainerIdx, stage);
}
if (cache.get(finalStageIndex).containsKey(category)) {
LOGGER.error("Category {} was used twice in stage {}. This erases the results of the previous category... ", category, finalStageIndex);
}
cache.get(finalStageIndex).put(category, results);
results.forEach(res -> relevantSegments.add(res.key));
LOGGER.trace("Category {} at stage {} executed @ {} ms", category, finalStageIndex, watch.getTime(TimeUnit.MILLISECONDS));

/* If this is the last stage, we can send relevant results per category back to the UI.
* Otherwise, we cannot since we might send results to the UI which would be filtered at a later stage
*/
if (finalStageIndex == stages.size() - 1) {
/* Finalize and submit per-container results */
responseObserver.onNext(
QueryContainerUtil.queryResult(
QueryContainerUtil.similarityQueryResult(
qt.getQueryConfig().getQueryId().toString(),
category,
results
)));
)));

List<String> segmentIds = results.stream().map(x -> x.key).filter(x -> !sentSegmentIds.contains(x)).collect(Collectors.toList());
if (segmentIds.isEmpty()) {
continue;
}
List<String> segmentIds = results.stream().map(x -> x.key).filter(x -> !sentSegmentIds.contains(x)).collect(Collectors.toList());
if (segmentIds.isEmpty()) {
continue;
}

Map<String, MediaSegmentDescriptor> segments = mediaSegmentReader.lookUpSegments(segmentIds);
Map<String, MediaSegmentDescriptor> segments = mediaSegmentReader.lookUpSegments(segmentIds);

responseObserver.onNext(
QueryContainerUtil.queryResult(
CineastGrpc.MediaSegmentQueryResult.newBuilder().addAllSegments(
responseObserver.onNext(
QueryContainerUtil.queryResult(
CineastGrpc.MediaSegmentQueryResult.newBuilder().addAllSegments(
segments.values().stream().map(MediaSegmentUtil::fromMediaSegmentDescriptor).collect(Collectors.toList())
).build()
)
);

List<MediaSegmentMetadataDescriptor> segmentMetaData = segmentMetadataReader.lookupMultimediaMetadata(segmentIds);
responseObserver.onNext(
QueryContainerUtil.queryResult(
CineastGrpc.MediaSegmentMetaDataQueryResult.newBuilder().addAllSegmentMetaData(
).build()
)
);

List<MediaSegmentMetadataDescriptor> segmentMetaData = segmentMetadataReader.lookupMultimediaMetadata(segmentIds);
responseObserver.onNext(
QueryContainerUtil.queryResult(
CineastGrpc.MediaSegmentMetaDataQueryResult.newBuilder().addAllSegmentMetaData(
segmentMetaData.stream().map(QueryContainerUtil::mediaSegmentMetaData).collect(Collectors.toList())
).build()
)
);
).build()
)
);

sentSegmentIds.addAll(segmentIds);
sentSegmentIds.addAll(segmentIds);

List<String> objectIds = segments.values().stream().map(MediaSegmentDescriptor::getObjectId).filter(x -> !sentObjectIds.contains(x)).collect(Collectors.toList());
if (objectIds.isEmpty()) {
continue;
}
Map<String, MediaObjectDescriptor> objects = mediaObjectReader.lookUpObjects(objectIds);
List<String> objectIds = segments.values().stream().map(MediaSegmentDescriptor::getObjectId).filter(x -> !sentObjectIds.contains(x)).collect(Collectors.toList());
if (objectIds.isEmpty()) {
continue;
}
Map<String, MediaObjectDescriptor> objects = mediaObjectReader.lookUpObjects(objectIds);

responseObserver.onNext(
QueryContainerUtil.queryResult(
CineastGrpc.MediaObjectQueryResult.newBuilder().addAllObjects(
responseObserver.onNext(
QueryContainerUtil.queryResult(
CineastGrpc.MediaObjectQueryResult.newBuilder().addAllObjects(
objects.values().stream().map(MediaObjectUtil::fromMediaObjectDescriptor).collect(Collectors.toList())
).build()
)
);

List<MediaObjectMetadataDescriptor> objectMetaData = objectMetadataReader.lookupMultimediaMetadata(objectIds);
responseObserver.onNext(
QueryContainerUtil.queryResult(
CineastGrpc.MediaObjectMetaDataQueryResult.newBuilder().addAllObjectMetaData(
).build()
)
);

List<MediaObjectMetadataDescriptor> objectMetaData = objectMetadataReader.lookupMultimediaMetadata(objectIds);
responseObserver.onNext(
QueryContainerUtil.queryResult(
CineastGrpc.MediaObjectMetaDataQueryResult.newBuilder().addAllObjectMetaData(
objectMetaData.stream().map(QueryContainerUtil::mediaObjectMetaData).collect(Collectors.toList())
).build()
)
);
).build()
)
);

sentObjectIds.addAll(objectIds);

}
}
/* We're done for this querycontainer */
});
qtRetrievalThread.setName("qt-stage" + stageIndex + "-" + qt.getCategories()); //TODO Better name
qtThreads.add(qtRetrievalThread);
qtRetrievalThread.start();
}

sentObjectIds.addAll(objectIds);
for (Thread thread : qtThreads) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

/* After we are done with a stage, we add all relevant segments to the config for the next stage. */
if (relevantSegments.size() == 0) {
LOGGER.warn("No relevant segments anymore, aborting staged querying");
/* Clear relevant segments (there are none) */
stageQConf.setRelevantSegmentIds(relevantSegments);
break;
}
stageQConf.setRelevantSegmentIds(relevantSegments);
relevantSegments.clear();
}

/* At this point, we have iterated over all stages. Now, we need to go back for all stages and send the results for the relevant ids. */
for (int stageIndex = 0; stageIndex < stages.size()-1; stageIndex++) {
cache.get(stageIndex).forEach((category, results) -> {
results.removeIf(pair -> !stageQConf.getRelevantSegmentIds().contains(pair.key));

responseObserver.onNext(
QueryContainerUtil.queryResult(
QueryContainerUtil.similarityQueryResult(
uuid, //TODO This assumes that all queries in a temporalquery have the same uuid
category,
results
)));
});
}

/* There should be no carry-over from this block since temporal queries are executed independently */
}

/* At this point, all StagedQueries have been executed for this TemporalQuery.
* Since results have always been sent for the final stage or, when appropriate, in intermediate steps, there's nothing left to do.
*/

responseObserver.onCompleted();

mediaSegmentReader.close();
mediaObjectReader.close();
segmentMetadataReader.close();
watch.stop();
LOGGER.debug("Query executed in {} ms", watch.getTime(TimeUnit.MILLISECONDS));

}

Expand Down

0 comments on commit c5d1c6d

Please sign in to comment.