Guava的cache提供了refresh功能。 在指定的时间间隔后,guava可以(惰性地/lazily)更新缓存。 默认的refresh实现是同步的(synchronously),一个线程在更新缓存时,其它线程会等待。 具体见LoadingCache和CacheLoader的javadoc。
LoadingCache
void refresh(K key)
… Loading is asynchronous only if CacheLoader.reload(K, V) was overridden with an asynchronous implementation.
CacheLoader
public ListenableFuture<V> reload(K key, V oldValue) throws Exception
… This implementation synchronously delegates to load(K).
下面提供了一个异步的CacheLoader实现,使得一个线程在refresh缓存时,其它线程可以不必等待,继续使用旧的缓存值。
public abstract class AsyncCacheLoader<K, V> extends CacheLoader<K, V> {
private final static Logger LOGGER = LoggerFactory.getLogger(AsyncCacheLoader.class);
private final ListeningExecutorService executorService =
MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
/**
* The default ExecutorService is single-threaded.
* Override this method to provide a meaningful multiple-thread implementation.
*/
protected ListeningExecutorService executorService() {
return executorService;
}
@Override
public ListenableFuture<V> reload(final K key, final V oldValue) throws Exception {
LOGGER.info("reload {}, try to call load() async-ly", key);
// we need to load new values asynchronously, so that calls to read values from the cache don't block
ListenableFuture<V> listenableFuture = executorService().submit(new Callable<V>() {
@Override
public V call() throws Exception {
try {
V value = load(key);
return value;
} catch (Exception ex) {
LOGGER.error("Exception happens when reload " + key + ", return old value.", ex);
return oldValue;
} finally {
LOGGER.info("reload {}, async-ly load() done.", key);
}
}
});
return listenableFuture;
}
}
单元测试,
static class FakeTicker extends Ticker { // 方便在测试里控制时间间隔
private final AtomicLong nanos = new AtomicLong();
public void advance(long time, TimeUnit timeUnit) {
nanos.addAndGet(timeUnit.toNanos(time));
}
@Override
public long read() {
return nanos.get();
}
}
@Test
public void refreshIsNoBlockingTest() {
final String OLD_VALUE = "OLD_VALUE";
final String NEW_VALUE = "NEW_VALUE";
final String KEY = "KEY";
final long LOAD_WAIT_TIME = 1000L;
final long REFRESH_INTERVAL = 10;
FakeTicker fakeTicker = new FakeTicker();
final AtomicBoolean firstCall = new AtomicBoolean(true);
final LoadingCache<String, String> cache = CacheBuilder.newBuilder()
.maximumSize(30)
.ticker(fakeTicker)
// this method not trigger refresh
//.expireAfterWrite(REFRESH_INTERVAL, TimeUnit.SECONDS)
// 指定缓存刷新时间
.refreshAfterWrite(REFRESH_INTERVAL, TimeUnit.SECONDS)
.build(new AsyncCacheLoader<String, String>() {
@Override
public String load(String key) throws Exception {
LOGGER.info("load(), starts ...");
if (firstCall.get()) { // 第一次load快速返回
firstCall.set(false);
LOGGER.info("load(), first invocation, return quickly.");
return OLD_VALUE;
}
LOGGER.info("load(), heavy work starts ...");
Thread.sleep(LOAD_WAIT_TIME); // 让后续load返回慢一点
LOGGER.info("load(), heavy work done ...");
return NEW_VALUE; // 返回个新值方便验证
}
});
String firstValue = cache.getUnchecked(KEY);
Assert.assertEquals(OLD_VALUE, firstValue);
// 让时钟往前走
fakeTicker.advance(REFRESH_INTERVAL + 10, TimeUnit.SECONDS);
try {
ExecutorService executorService = Executors.newCachedThreadPool();
Future<String> client1 = executorService.submit(new Callable<String>() {
public String call() {
LOGGER.info("client 1 is checking the cache");
return cache.getUnchecked(KEY);
}
});
// 新开一个ExecutorService,保证不会重用线程
ExecutorService executorService2 = Executors.newCachedThreadPool();
Future<String> client2 = executorService2.submit(new Callable<String>() {
public String call() {
LOGGER.info("client 2 is checking the cache");
return cache.getUnchecked(KEY);
}
});
String client1Value = client1.get();
String client2Value = client2.get();
// both client 1 and client 2 will still get old value
// since new value is under async-loading
// refresh还没有完成,返回老的缓存值
Assert.assertEquals(OLD_VALUE, client1Value);
// refresh还没有完成,返回老的缓存值
Assert.assertEquals(OLD_VALUE, client2Value);
Thread.sleep(LOAD_WAIT_TIME + 500); // 等待refresh完成
// now new value should be ready
Assert.assertEquals(NEW_VALUE, cache.getUnchecked(KEY)); // 返回新的值
} catch (InterruptedException e) {
Assert.fail();
} catch (ExecutionException e) {
Assert.fail();
}
}