-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CAY-1251] Introduce worker-side model cache #1252
Merged
Merged
Changes from all commits
Commits
Show all changes
24 commits
Select commit
Hold shift + click to select a range
38b1dc6
model cache
wynot12 74c3bd7
metric profiling
wynot12 7f1da6a
fix pull tracking
wynot12 c169a1b
fix
wynot12 66f227f
update local cache
wynot12 416597b
minor
wynot12 3a9d465
Add comments
wynot12 eccdb42
style error
wynot12 5d2890c
user parameter to enable model cache
wynot12 746f4db
Merge branch 'master' into model-cache
yunseong bfc9e35
Merge branch 'master' of github.com:cmssnu/cay into model-cache
wynot12 813a527
use guava loading cache
wynot12 22ddc14
fix style error
wynot12 e446908
Merge branch 'model-cache' of github.com:cmssnu/cay into model-cache
wynot12 a2300e6
manual refresh of cache
wynot12 27a5c2e
stop refreshing cache
wynot12 31f0b0a
Merge branch 'master' of github.com:cmssnu/cay into model-cache
wynot12 9c70206
Merge branch 'master' of github.com:cmssnu/cay into model-cache
wynot12 209156c
multi-key push
wynot12 ab77472
use multi-get for refreshing model cache
wynot12 288f823
Pull tracing in cache's loadAll
wynot12 eef199c
Merge remote-tracking branch 'origin' into model-cache
wynot12 9b6eead
style error fix
wynot12 bc10f64
fix style error
wynot12 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
195 changes: 195 additions & 0 deletions
195
dolphin/async/src/main/java/edu/snu/cay/dolphin/async/CachedModelAccessor.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,195 @@ | ||
/* | ||
* Copyright (C) 2017 Seoul National University | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package edu.snu.cay.dolphin.async; | ||
|
||
import com.google.common.cache.CacheBuilder; | ||
import com.google.common.cache.CacheLoader; | ||
import com.google.common.cache.LoadingCache; | ||
import edu.snu.cay.dolphin.async.core.worker.ModelAccessor; | ||
import edu.snu.cay.dolphin.async.metric.Tracer; | ||
import edu.snu.cay.services.et.evaluator.api.Table; | ||
import edu.snu.cay.services.et.evaluator.api.TableAccessor; | ||
import edu.snu.cay.services.et.evaluator.api.UpdateFunction; | ||
import edu.snu.cay.services.et.exceptions.TableNotExistException; | ||
import org.apache.reef.tang.annotations.Parameter; | ||
|
||
import javax.inject.Inject; | ||
import java.util.*; | ||
import java.util.concurrent.*; | ||
|
||
/** | ||
* A {@link ModelAccessor} implementation with model cache. | ||
*/ | ||
public final class CachedModelAccessor<K, P, V> implements ModelAccessor<K, P, V> { | ||
private static final int MODEL_REFRESH_SEC = 10; // TODO #1254: introduce a sophisticated cache policy | ||
private static final int CACHE_CONCURRENCY_WRITES = 4; | ||
|
||
private final LoadingCache<K, V> modelLoadingCache; | ||
|
||
private final Table<K, V, P> modelTable; | ||
private final UpdateFunction<K, V, P> modelUpdateFunction; | ||
|
||
private final ScheduledExecutorService refreshExecutor; | ||
|
||
private final Tracer pushTracer = new Tracer(); | ||
private final Tracer pullTracer = new Tracer(); | ||
|
||
@Inject | ||
private CachedModelAccessor(@Parameter(DolphinParameters.ModelTableId.class) final String modelTableId, | ||
final TableAccessor tableAccessor, | ||
final UpdateFunction<K, V, P> modelUpdateFunction) throws TableNotExistException { | ||
this.modelTable = tableAccessor.getTable(modelTableId); | ||
this.modelUpdateFunction = modelUpdateFunction; | ||
|
||
this.modelLoadingCache = initCache(); | ||
|
||
refreshExecutor = Executors.newSingleThreadScheduledExecutor(); | ||
refreshExecutor.scheduleWithFixedDelay(() -> { | ||
final Set<K> keys = modelLoadingCache.asMap().keySet(); | ||
|
||
if (!keys.isEmpty()) { | ||
final List<K> keyList = new ArrayList<>(keys.size()); | ||
try { | ||
pullTracer.startTimer(); | ||
final Map<K, V> kvMap = modelTable.multiGetOrInit(keyList).get(); | ||
pullTracer.recordTime(keys.size()); | ||
|
||
kvMap.forEach(modelLoadingCache::put); | ||
} catch (InterruptedException | ExecutionException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
}, 0, MODEL_REFRESH_SEC, TimeUnit.SECONDS); | ||
} | ||
|
||
public void stopRefreshingCache() { | ||
refreshExecutor.shutdown(); | ||
} | ||
|
||
private LoadingCache<K, V> initCache() { | ||
return CacheBuilder.newBuilder() | ||
.concurrencyLevel(CACHE_CONCURRENCY_WRITES) | ||
.build(new CacheLoader<K, V>() { | ||
@Override | ||
public V load(final K key) throws Exception { | ||
pullTracer.startTimer(); | ||
final Future<V> pullFuture = modelTable.getOrInit(key); | ||
final V value = pullFuture.get(); | ||
pullTracer.recordTime(1); | ||
return value; | ||
} | ||
|
||
@Override | ||
public Map<K, V> loadAll(final Iterable<? extends K> keys) throws Exception { | ||
final List<K> keyList = new ArrayList<>(); | ||
keys.forEach(keyList::add); | ||
|
||
pullTracer.startTimer(); | ||
final Map<K, V> kvMap = modelTable.multiGetOrInit(keyList).get(); | ||
pullTracer.recordTime(kvMap.size()); | ||
|
||
return kvMap; | ||
} | ||
}); | ||
} | ||
|
||
/** | ||
* Push a delta value for a key, applying the change to cache. | ||
*/ | ||
@Override | ||
public void push(final K key, final P deltaValue) { | ||
pushTracer.startTimer(); | ||
modelTable.updateNoReply(key, deltaValue); | ||
pushTracer.recordTime(1); | ||
|
||
// update value in cache. this modification will not cause entry loading in cache. | ||
modelLoadingCache.asMap(). | ||
computeIfPresent(key, (k, oldValue) -> modelUpdateFunction.updateValue(k, oldValue, deltaValue)); | ||
} | ||
|
||
@Override | ||
public void push(final Map<K, P> keyToDeltaValueMap) { | ||
pushTracer.startTimer(); | ||
modelTable.multiUpdateNoReply(keyToDeltaValueMap); | ||
pushTracer.recordTime(keyToDeltaValueMap.size()); | ||
|
||
// update value in cache. this modification will not cause entry loading in cache. | ||
keyToDeltaValueMap.forEach((key, deltaValue) -> modelLoadingCache.asMap(). | ||
computeIfPresent(key, (k, oldValue) -> modelUpdateFunction.updateValue(k, oldValue, deltaValue))); | ||
} | ||
|
||
/** | ||
* Retrieve a value for a requested key. | ||
* Pull value from servers, if cache does not have value for the key. | ||
*/ | ||
@Override | ||
public V pull(final K key) { | ||
try { | ||
return modelLoadingCache.get(key); | ||
} catch (ExecutionException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
/** | ||
* Retrieve values for requested keys. | ||
* Pull values from servers, if cache does not have all values for the keys. | ||
*/ | ||
@Override | ||
public List<V> pull(final List<K> keys) { | ||
try { | ||
final Map<K, V> kvMap = modelLoadingCache.getAll(keys); | ||
final List<V> valueList = new ArrayList<>(keys.size()); | ||
keys.forEach(key -> valueList.add(kvMap.get(key))); | ||
|
||
return valueList; | ||
} catch (ExecutionException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
/** | ||
* This method does not care about cache. | ||
*/ | ||
@Override | ||
public List<V> pull(final List<K> keys, final Table<K, V, P> aModelTable) { | ||
try { | ||
final Map<K, V> result = aModelTable.multiGetOrInit(keys).get(); | ||
|
||
final List<V> valueList = new ArrayList<>(keys.size()); | ||
keys.forEach(key -> valueList.add(result.get(key))); | ||
|
||
return valueList; | ||
|
||
} catch (InterruptedException | ExecutionException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
@Override | ||
public Map<String, Double> getAndResetMetrics() { | ||
final Map<String, Double> metrics = new HashMap<>(); | ||
metrics.put(METRIC_TOTAL_PULL_TIME_SEC, pullTracer.totalElapsedTime()); | ||
metrics.put(METRIC_TOTAL_PUSH_TIME_SEC, pushTracer.totalElapsedTime()); | ||
metrics.put(METRIC_AVG_PULL_TIME_SEC, pullTracer.avgTimePerElem()); | ||
metrics.put(METRIC_AVG_PUSH_TIME_SEC, pushTracer.avgTimePerElem()); | ||
|
||
pullTracer.resetTrace(); | ||
pushTracer.resetTrace(); | ||
return metrics; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be great if we clarify what are the differences between
pull(List<K> keys, Table table)
andpull(List<K> keys)
. The distinction is missing in the base interface, but we differentiate them in this implementation (with cache vs. without cache).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm.. actually this part is completely same with no-cache version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, then the question would be more about
why do we not care about cache in this method, while
pull(final List<K> keys)
gets the data from the cache?This question raised my original question above: what are the differences between the two methods?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pull(List<K> keys, Table table)
is for using other tables that has no caches.This
ModelAccessor
implementation provides a cache only for a table in its field.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pull(List<K> keys, Table table)
has been inserted to support offline model evaluation.So this method may seem awkward in
ModelAccessor
interface.