Guava的cache提供了refresh功能。 在指定的时间间隔后,guava可以(惰性地/lazily)更新缓存。 默认的refresh实现是同步的(synchronously),一个线程在更新缓存时,其它线程会等待。 具体见LoadingCacheCacheLoader的javadoc。

LoadingCache
void refresh(K key)
… Loading is asynchronous only if CacheLoader.reload(K, V) was overridden with an asynchronous implementation.

CacheLoader
public ListenableFuture<V> reload(K key, V oldValue) throws Exception
… This implementation synchronously delegates to load(K).

下面提供了一个异步的CacheLoader实现,使得一个线程在refresh缓存时,其它线程可以不必等待,继续使用旧的缓存值。

public abstract class AsyncCacheLoader<K, V> extends CacheLoader<K, V> {
    private final static Logger LOGGER = LoggerFactory.getLogger(AsyncCacheLoader.class);

    private final ListeningExecutorService executorService =
            MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());

    /**
     * The default ExecutorService is single-threaded.
     * Override this method to provide a meaningful multiple-thread implementation.
     */
    protected ListeningExecutorService executorService() {
        return executorService;
    }

    @Override
    public ListenableFuture<V> reload(final K key, final V oldValue) throws Exception {
        LOGGER.info("reload {}, try to call load() async-ly", key);
        // we need to load new values asynchronously, so that calls to read values from the cache don't block
        ListenableFuture<V> listenableFuture = executorService().submit(new Callable<V>() {
            @Override
            public V call() throws Exception {
                try {
                    V value = load(key);
                    return value;
                } catch (Exception ex) {
                    LOGGER.error("Exception happens when reload " + key + ", return old value.", ex);
                    return oldValue;
                } finally {
                    LOGGER.info("reload {}, async-ly load() done.", key);
                }
            }
        });
        return listenableFuture;
    }
}

单元测试,

static class FakeTicker extends Ticker { // 方便在测试里控制时间间隔
	private final AtomicLong nanos = new AtomicLong();

	public void advance(long time, TimeUnit timeUnit) {
		nanos.addAndGet(timeUnit.toNanos(time));
	}

	@Override
	public long read() {
		return nanos.get();
	}
}

@Test
public void refreshIsNoBlockingTest() {
	final String OLD_VALUE = "OLD_VALUE";
	final String NEW_VALUE = "NEW_VALUE";
	final String KEY = "KEY";
	final long LOAD_WAIT_TIME = 1000L;
	final long REFRESH_INTERVAL = 10;

	FakeTicker fakeTicker = new FakeTicker();

	final AtomicBoolean firstCall = new AtomicBoolean(true);
	final LoadingCache<String, String> cache = CacheBuilder.newBuilder()
			.maximumSize(30)
			.ticker(fakeTicker)
			// this method not trigger refresh
			//.expireAfterWrite(REFRESH_INTERVAL, TimeUnit.SECONDS) 
			// 指定缓存刷新时间
			.refreshAfterWrite(REFRESH_INTERVAL, TimeUnit.SECONDS) 
			.build(new AsyncCacheLoader<String, String>() {
				@Override
				public String load(String key) throws Exception {
					LOGGER.info("load(), starts ...");
					if (firstCall.get()) { // 第一次load快速返回
						firstCall.set(false);
						LOGGER.info("load(), first invocation, return quickly.");
						return OLD_VALUE;
					}

					LOGGER.info("load(), heavy work starts ...");
					Thread.sleep(LOAD_WAIT_TIME); // 让后续load返回慢一点
					LOGGER.info("load(), heavy work done ...");
					return NEW_VALUE;       // 返回个新值方便验证
				}
			});


	String firstValue = cache.getUnchecked(KEY);
	Assert.assertEquals(OLD_VALUE, firstValue);

	// 让时钟往前走
	fakeTicker.advance(REFRESH_INTERVAL + 10, TimeUnit.SECONDS); 

	try {
		ExecutorService executorService = Executors.newCachedThreadPool();
		Future<String> client1 = executorService.submit(new Callable<String>() {
			public String call() {
				LOGGER.info("client 1 is checking the cache");
				return cache.getUnchecked(KEY);
			}
		});

		// 新开一个ExecutorService,保证不会重用线程
		ExecutorService executorService2 = Executors.newCachedThreadPool();
		Future<String> client2 = executorService2.submit(new Callable<String>() {
			public String call() {
				LOGGER.info("client 2 is checking the cache");
				return cache.getUnchecked(KEY);
			}
		});

		String client1Value = client1.get();
		String client2Value = client2.get();
		// both client 1 and client 2 will still get old value 
		// since new value is under async-loading
		// refresh还没有完成,返回老的缓存值
		Assert.assertEquals(OLD_VALUE, client1Value); 
		// refresh还没有完成,返回老的缓存值
		Assert.assertEquals(OLD_VALUE, client2Value); 

		Thread.sleep(LOAD_WAIT_TIME + 500);  // 等待refresh完成
		// now new value should be ready
		Assert.assertEquals(NEW_VALUE, cache.getUnchecked(KEY)); // 返回新的值
	} catch (InterruptedException e) {
		Assert.fail();
	} catch (ExecutionException e) {
		Assert.fail();
	}
}