-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CAY-1251] Introduce worker-side model cache #1252
Changes from 10 commits
38b1dc6
74c3bd7
7f1da6a
c169a1b
66f227f
416597b
3a9d465
eccdb42
5d2890c
746f4db
bfc9e35
813a527
22ddc14
e446908
a2300e6
27a5c2e
31f0b0a
9c70206
209156c
ab77472
288f823
eef199c
9b6eead
bc10f64
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,195 @@ | ||
/* | ||
* Copyright (C) 2017 Seoul National University | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package edu.snu.cay.dolphin.async; | ||
|
||
import edu.snu.cay.dolphin.async.metric.Tracer; | ||
import edu.snu.cay.services.et.evaluator.api.Table; | ||
import edu.snu.cay.services.et.evaluator.api.TableAccessor; | ||
import edu.snu.cay.services.et.evaluator.api.UpdateFunction; | ||
import edu.snu.cay.services.et.exceptions.TableNotExistException; | ||
import org.apache.reef.tang.annotations.Parameter; | ||
|
||
import javax.inject.Inject; | ||
import java.util.*; | ||
import java.util.concurrent.*; | ||
|
||
/** | ||
* A {@link ModelAccessor} implementation with model cache. | ||
*/ | ||
public final class CachedModelAccessor<K, P, V> implements ModelAccessor<K, P, V> { | ||
|
||
private final Map<K, V> cache = new ConcurrentHashMap<>(); | ||
|
||
private final Table<K, V, P> modelTable; | ||
private final UpdateFunction<K, V, P> modelUpdateFunction; | ||
|
||
private final Tracer pushTracer = new Tracer(); | ||
private final Tracer pullTracer = new Tracer(); | ||
|
||
@Inject | ||
private CachedModelAccessor(@Parameter(DolphinParameters.ModelTableId.class) final String modelTableId, | ||
final TableAccessor tableAccessor, | ||
final UpdateFunction<K, V, P> modelUpdateFunction) throws TableNotExistException { | ||
this.modelTable = tableAccessor.getTable(modelTableId); | ||
this.modelUpdateFunction = modelUpdateFunction; | ||
|
||
// TODO #00: introduce a sophisticated cache refresh/eviction policy | ||
Executors.newSingleThreadScheduledExecutor() | ||
.scheduleWithFixedDelay(this::refreshCache, 10, 10, TimeUnit.SECONDS); | ||
} | ||
|
||
/** | ||
* Push a delta value for a key, applying the change to cache. | ||
*/ | ||
@Override | ||
public void push(final K key, final P deltaValue) { | ||
pushTracer.startTimer(); | ||
modelTable.updateNoReply(key, deltaValue); | ||
pushTracer.recordTime(1); | ||
|
||
// update local cache. oldValue always exists | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm a bit confused with the phrase |
||
cache.compute(key, (k, oldValue) -> modelUpdateFunction.updateValue(k, oldValue, deltaValue)); | ||
} | ||
|
||
/** | ||
* Retrieve a value for a requested key. | ||
* Pull value from servers, if cache does not have value for the key. | ||
*/ | ||
@Override | ||
public V pull(final K key) { | ||
// 1. in cache | ||
final V cachedValue = cache.get(key); | ||
if (cachedValue != null) { | ||
return cachedValue; | ||
} else { | ||
// 2. not in cache | ||
final V pulledValue; | ||
try { | ||
pullTracer.startTimer(); | ||
pulledValue = modelTable.getOrInit(key).get(); | ||
pullTracer.recordTime(1); | ||
} catch (InterruptedException | ExecutionException e) { | ||
throw new RuntimeException(e); | ||
} | ||
cache.put(key, pulledValue); | ||
return pulledValue; | ||
} | ||
} | ||
|
||
/** | ||
* Retrieve values for requested keys. | ||
* Pull values from servers, if cache does not have all values for the keys. | ||
*/ | ||
@Override | ||
public List<V> pull(final List<K> keys) { | ||
// 1. all values are in cache | ||
if (cache.keySet().containsAll(keys)) { | ||
final List<V> resultValues = new ArrayList<>(keys.size()); | ||
keys.forEach(key -> resultValues.add(cache.get(key))); | ||
return resultValues; | ||
} else { | ||
// 2. some values are not in cache | ||
final Map<K, V> resultMap = new HashMap<>(keys.size()); | ||
final Map<K, Future<V>> pullFutures = new HashMap<>(); | ||
for (final K key : keys) { | ||
final V value = cache.get(key); | ||
if (value == null) { | ||
pullFutures.put(key, modelTable.getOrInit(key)); | ||
} else { | ||
resultMap.put(key, value); | ||
} | ||
} | ||
|
||
if (!pullFutures.isEmpty()) { | ||
pullTracer.startTimer(); | ||
// pull non-cached values | ||
pullFutures.forEach((key, valueFuture) -> { | ||
final V value; | ||
try { | ||
value = valueFuture.get(); | ||
} catch (InterruptedException | ExecutionException e) { | ||
throw new RuntimeException(e); | ||
} | ||
cache.put(key, value); | ||
resultMap.put(key, value); | ||
}); | ||
pullTracer.recordTime(pullFutures.size()); | ||
} | ||
|
||
return new ArrayList<>(resultMap.values()); | ||
} | ||
} | ||
|
||
/** | ||
* This method does not care about cache. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It'd be great if we clarify what are the differences between There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm.. actually this part is completely same with no-cache version. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh, then the question would be more about This question raised my original question above: what are the differences between the two methods? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
*/ | ||
@Override | ||
public List<V> pull(final List<K> keys, final Table aModelTable) { | ||
final List<Future<V>> resultList = new ArrayList<>(keys.size()); | ||
keys.forEach(key -> resultList.add(aModelTable.getOrInit(key))); | ||
|
||
final List<V> resultValues = new ArrayList<>(keys.size()); | ||
for (final Future<V> opResult : resultList) { | ||
V result; | ||
while (true) { | ||
try { | ||
result = opResult.get(); | ||
break; | ||
} catch (InterruptedException e) { | ||
// ignore and keep waiting | ||
} catch (ExecutionException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
resultValues.add(result); | ||
} | ||
|
||
return resultValues; | ||
} | ||
|
||
@Override | ||
public Map<String, Double> getAndResetMetrics() { | ||
final Map<String, Double> metrics = new HashMap<>(); | ||
metrics.put(METRIC_TOTAL_PULL_TIME_SEC, pullTracer.totalElapsedTime()); | ||
metrics.put(METRIC_TOTAL_PUSH_TIME_SEC, pushTracer.totalElapsedTime()); | ||
metrics.put(METRIC_AVG_PULL_TIME_SEC, pullTracer.avgTimePerElem()); | ||
metrics.put(METRIC_AVG_PUSH_TIME_SEC, pushTracer.avgTimePerElem()); | ||
|
||
pullTracer.resetTrace(); | ||
pushTracer.resetTrace(); | ||
return metrics; | ||
} | ||
|
||
private void refreshCache() { | ||
final Set<K> keys = cache.keySet(); | ||
|
||
if (!keys.isEmpty()) { | ||
pullTracer.startTimer(); | ||
final Map<K, Future<V>> pullFutures = new HashMap<>(keys.size()); | ||
keys.forEach(key -> pullFutures.put(key, modelTable.getOrInit(key))); | ||
|
||
pullFutures.forEach((key, pullFuture) -> { | ||
try { | ||
cache.put(key, pullFuture.get()); | ||
} catch (InterruptedException | ExecutionException e) { | ||
throw new RuntimeException(e); | ||
} | ||
}); | ||
pullTracer.recordTime(pullFutures.size()); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please update the issue number to #1254