Spring Boot整合Memcached

前面已经提到,缓存是快速提升系统性能,缓解瓶颈的有效手段。

缓存的种类多种多样,小到CPU的缓存,大到静态生成的页面缓存。在本小节中,我们主要讨论在Spring Boot中整合如下两种缓存:

  • 本地缓存: 在内存中开辟一小块空间,用于缓存,速度很快,但容量受限。我们采用Gruva中的缓存实现。
  • 网络缓存: 同一微服务的不同节点间,通过网络共享,例如Memcached。

通用缓存接口

既然要在微服务中支持2种缓存,不妨设计一个较为通用的接口:

  1. public interface ICache<K, V> {
  2. @Nullable
  3. V get(K key);
  4. @Nullable
  5. default V cacheGet(K key, Function<K, V> func, int ttlSecs) {
  6. V val = get(key);
  7. if (val != null) {
  8. return val;
  9. } else {
  10. val = func.apply(key);
  11. put(key, val, ttlSecs);
  12. return val;
  13. }
  14. }
  15. default V cacheGet(K key, Function<K, V> func) {
  16. return cacheGet(key, func, 0);
  17. }
  18. Map<K, V> batchGet(Collection<K> keys);
  19. default Map<K, V> batchCacheGet(Collection<K> keys, Function<Collection<K>, Map<K, V>> func, int ttlSecs) {
  20. // hit map
  21. Map<K, V> hitMap = batchGet(keys);
  22. // miss keys
  23. Collection<K> missedKeys = null;
  24. if (hitMap == null || hitMap.isEmpty()) {
  25. missedKeys = keys;
  26. } else {
  27. missedKeys = keys.stream().filter(k -> !hitMap.containsKey(k)).collect(Collectors.toSet());
  28. }
  29. // check if no miss keys
  30. if (missedKeys == null || missedKeys.isEmpty()) {
  31. return hitMap;
  32. }
  33. // fetch miss key
  34. Map<K, V> missMap = func.apply(missedKeys);
  35. missMap.entrySet().forEach(e -> put(e.getKey(), e.getValue(), ttlSecs));
  36. if (missMap == null || missMap.isEmpty()) {
  37. // no miss map again
  38. return hitMap;
  39. } else {
  40. // union & return
  41. hitMap.putAll(missMap);
  42. return hitMap;
  43. }
  44. }
  45. default Map<K, V> batchCacheGet(Collection<K> keys, Function<Collection<K>, Map<K, V>> func) {
  46. return batchCacheGet(keys, func, 0);
  47. }
  48. void put(K key, V value);
  49. void put(K key, V value, int ttlSecs);
  50. void del(K key);
  51. default void batchDel(Collection<K> keys) {
  52. if (keys != null) {
  53. keys.stream().forEach(key -> del(key));
  54. }
  55. }
  56. void clear();
  57. }

如上所示,我们定义了3大类Cache的基本操作:

  • get/batchGet: 从缓存中获取数据,支持单个或批量操作。
  • put/del: 向缓存中写入或删除,支持设置超时时间。
  • cacheGet: 若缓存中存在则直接返回,否则通过一个Function来生成结果,并写入缓存。类似的,支持单个和批量操作、支持设置超时时间。

LocalCache的实现

在设计Memcached缓存之前,先来看一下本地缓存实现:

  1. public class LocalCache<K, V> implements ICache<K, V> {
  2. private Logger LOG = LoggerFactory.getLogger(getClass());
  3. private Cache<K, V> gCache;
  4. private long ttlSecs = 0;
  5. public LocalCache(long capacity, long ttlSecs) {
  6. CacheBuilder builder = CacheBuilder.newBuilder();
  7. if (capacity > 0) {
  8. builder.maximumSize(capacity);
  9. }
  10. if (ttlSecs > 0) {
  11. this.ttlSecs = ttlSecs;
  12. builder.expireAfterWrite(ttlSecs, TimeUnit.SECONDS);
  13. }
  14. this.gCache = builder.build();
  15. }
  16. @Nullable
  17. @Override
  18. public V get(K key) {
  19. return gCache.getIfPresent(key);
  20. }
  21. @Override
  22. public Map<K, V> batchGet(Collection<K> keys) {
  23. if (keys == null || keys.isEmpty()) {
  24. return new HashMap<>();
  25. } else {
  26. Map<K, V> result = new HashMap<>();
  27. for (K key : keys) {
  28. V val = gCache.getIfPresent(key);
  29. if (val != null) {
  30. result.put(key, val);
  31. }
  32. }
  33. return result;
  34. }
  35. }
  36. @Override
  37. public void put(K key, V value) {
  38. gCache.put(key, value);
  39. }
  40. @Override
  41. public void put(K key, V value, int curTtlSecs) {
  42. if (curTtlSecs != this.ttlSecs) {
  43. LOG.error("not support per-put ttlSecs currently");
  44. }
  45. put(key, value);
  46. }
  47. @Override
  48. public void del(K key) {
  49. gCache.invalidate(key);
  50. }
  51. @Override
  52. public void batchDel(Collection<K> keys) {
  53. gCache.invalidateAll(keys);
  54. }
  55. @Override
  56. public void clear() {
  57. gCache.invalidateAll();
  58. }
  59. }

如上所示,我们调用了Grava的缓存,来实现了本地缓存。

要说明的是,由于Grava的设计限制,目前TTL需要在创建缓存之初就设定好,并不支持per-key的ttl设定。

MemcachedCache的实现

在Spring Boot中集成Memcached,首先要选择一款基于Java的客户端,比较成熟的开源项目有:

  • spymemcached
  • XMemcached在本小节中,我们选择社区更为活跃的XMemcached,它支持线程池、一致性哈系等较为重要的特性。

首先来看一下客户端的构造:

  1. public class MemcachedClientBuilder2 {
  2. public static MemcachedClient build(String serverList, int connPoolSize) throws IOException {
  3. MemcachedClientBuilder builder = new XMemcachedClientBuilder(
  4. AddrUtil.getAddresses(serverList));
  5. // conn pool
  6. builder.setConnectionPoolSize(connPoolSize);
  7. // consistent hash
  8. builder.setSessionLocator(new KetamaMemcachedSessionLocator());
  9. return builder.build();
  10. }
  11. }

如上所示,Builder主要设定了两个参数:

  • serverList: 服务器列表(形如ip:port,若有多个可通过空格分割开)
  • connPoolSize: 线程池大小

如果你仔细观察ICache接口,可以发现它是泛型的ICache,K和V可以是任意类型。

然而,Memcached的设计较为轻量,Key必须是字符串,而Value则是byte数组。

所以,需要设计一种通用的方式,以方便泛型数据类型到Memcached的Key/Value转换。

我们将这一转换逻辑抽提成Key/Value的Transformer, Key的:

  1. public interface CacheKeyTransformer<T> {
  2. String getKey(T t);
  3. }

和Value的

  1. public interface CacheValueTransformer<T> {
  2. byte[] serialize(T obj);
  3. T deserialize(byte[] bytes);
  4. }

这两个接口看起来很抽象,我们首先来看一下DefaultCacheKeyTransformer,实现了任何类型到String(Memcached Key类型)的转换:

  1. public class DefaultCacheKeyTransformer<T> implements CacheKeyTransformer<T> {
  2. private String cacheType;
  3. public DefaultCacheKeyTransformer(String cacheType) {
  4. this.cacheType = cacheType;
  5. }
  6. @Override
  7. public String getKey(T t) {
  8. return cacheType + "#" + t.toString();
  9. }
  10. }

一种很常见的场景,是将对象序列化为Json然后放到Memcached的Value中,JsonCacheValueTransformer完成了这一过程:

  1. public class JsonCacheValueTransformer<T> implements CacheValueTransformer<T> {
  2. protected final Logger LOG = LoggerFactory.getLogger(getClass());
  3. private ObjectMapper objectMapper;
  4. private Class<T> cls;
  5. public JsonCacheValueTransformer(Class<T> cls) {
  6. this.objectMapper = new ObjectMapper();
  7. this.cls = cls;
  8. }
  9. @Override
  10. public byte[] serialize(T o) {
  11. byte[] defReturn = new byte[1];
  12. try {
  13. if (o == null) {
  14. return defReturn;
  15. }
  16. return objectMapper.writeValueAsBytes(o);
  17. } catch (Exception e) {
  18. LOG.error("JsonCacheValueTransformer serialize exception", e);
  19. return defReturn;
  20. }
  21. }
  22. @Override
  23. public T deserialize(byte[] bytes) {
  24. try {
  25. if (bytes == null) {
  26. return null;
  27. }
  28. return objectMapper.readValue(bytes, cls);
  29. } catch (Exception e) {
  30. LOG.error("JsonCacheValueTransformer deserialize exception", e);
  31. return null;
  32. }
  33. }
  34. }

如上所示,我们应用了Jackson来实现了Json的序列化(反序列化),并适配了byte数组到字符串的转换。

此外,Cache的Value中直接存储Integer/Long/String也较为常见,感兴趣的可以直接查看lmsia-cache项目的源代码,这里不再赘述。

实现了Key/Value的序列化之后,我们看一下具体的MemcachedCache实现:

  1. public abstract class AbstractMemcachedCache<K, V> implements ICache<K, V> {
  2. protected final Logger LOG = LoggerFactory.getLogger(getClass());
  3. private static final int connPoolSize = 16;
  4. protected abstract MemcachedClient getMemcachedClient();
  5. protected abstract CacheKeyTransformer<K> getKeyTransformer();
  6. protected abstract CacheValueTransformer<V> getValueTransformer();
  7. private Transcoder<byte[]> transcoder = new Transcoder<byte[]>() {
  8. @Override
  9. public void setPrimitiveAsString(boolean primitiveAsString) {
  10. }
  11. @Override
  12. public void setPackZeros(boolean packZeros) {
  13. }
  14. @Override
  15. public void setCompressionThreshold(int to) {
  16. }
  17. @Override
  18. public void setCompressionMode(CompressionMode compressMode) {
  19. }
  20. @Override
  21. public boolean isPrimitiveAsString() {
  22. return false;
  23. }
  24. @Override
  25. public boolean isPackZeros() {
  26. return false;
  27. }
  28. @Override
  29. public CachedData encode(byte[] o) {
  30. return new CachedData(0, o);
  31. }
  32. @Override
  33. public byte[] decode(CachedData d) {
  34. if (d != null) {
  35. return d.getData();
  36. } else {
  37. return null;
  38. }
  39. }
  40. };
  41. public void init() throws Exception {
  42. // check
  43. if (getKeyTransformer() == null) {
  44. throw new RuntimeException("keyTransformer can not be null");
  45. }
  46. if (getValueTransformer() == null) {
  47. throw new RuntimeException("valueTransformer can not be null");
  48. }
  49. }
  50. @Nullable
  51. @Override
  52. public V get(K key) {
  53. try {
  54. byte[] bytes = getMemcachedClient().get(getKeyTransformer().getKey(key), transcoder);
  55. if (bytes == null) {
  56. return null;
  57. }
  58. return getValueTransformer().deserialize(bytes);
  59. } catch (Exception e) {
  60. LOG.error("memcached get exception", e);
  61. return null;
  62. }
  63. }
  64. @Override
  65. public Map<K, V> batchGet(Collection<K> keys) {
  66. if (keys == null || keys.isEmpty()) {
  67. return new HashMap<>();
  68. }
  69. Map<K, String> key2idMap = new HashMap<>();
  70. for (K key : keys) {
  71. key2idMap.put(key, getKeyTransformer().getKey(key));
  72. }
  73. Collection<String> ids = key2idMap.values();
  74. try {
  75. Map<String, byte[]> map = getMemcachedClient().get(ids, transcoder);
  76. if (map == null || map.isEmpty()) {
  77. return new HashMap<>();
  78. }
  79. Map<K, V> result = new HashMap<>();
  80. for (Entry<K, String> entry : key2idMap.entrySet()) {
  81. K key = entry.getKey();
  82. String id = entry.getValue();
  83. byte[] bytes = map.get(id);
  84. if (bytes != null) {
  85. result.put(key, getValueTransformer().deserialize(bytes));
  86. }
  87. }
  88. return result;
  89. } catch (Exception e) {
  90. LOG.error("batchGet exception", e);
  91. return new HashMap<>();
  92. }
  93. }
  94. @Override
  95. public void put(K key, V value) {
  96. put(key, value, 0);
  97. }
  98. @Override
  99. public void put(K key, V value, int ttlSecs) {
  100. try {
  101. getMemcachedClient().add(
  102. getKeyTransformer().getKey(key),
  103. ttlSecs,
  104. getValueTransformer().serialize(value));
  105. } catch (Exception e) {
  106. LOG.error("memcached put exception", e);
  107. }
  108. }
  109. @Override
  110. public void del(K key) {
  111. try {
  112. getMemcachedClient().delete(getKeyTransformer().getKey(key));
  113. } catch (Exception e) {
  114. LOG.error("memcached del exception", e);
  115. }
  116. }
  117. @Override
  118. public void clear() {
  119. try {
  120. getMemcachedClient().flushAll();
  121. } catch (Exception e) {
  122. LOG.error("memcached flushAll exception", e);
  123. }
  124. }
  125. }

如上所示,AbstractMemcachedCache预留了3个抽象getter方法:

  • memcachedClient
  • keyTransfomer
  • valueTransfomer

实现者可以根据自己的需求来实现。

自动配置

前面已经提到,MemcachedCache依赖MemcachedClient的实例。

如果每次都要手动构造MemcachedClient,实在是有些繁琐,我们可以通过Spring Boot的自动配置来自动注入:

  1. @Configuration
  2. @ConfigurationProperties(prefix = "memcached")
  3. public class MemcachedClientAutoConfiguration {
  4. // Server list seperate by space
  5. private String serverList;
  6. // Connection Pool Size, default 64
  7. private int connPoolSize = 64;
  8. public String getServerList() {
  9. return serverList;
  10. }
  11. public void setServerList(String serverList) {
  12. this.serverList = serverList;
  13. }
  14. public int getConnPoolSize() {
  15. return connPoolSize;
  16. }
  17. public void setConnPoolSize(int connPoolSize) {
  18. this.connPoolSize = connPoolSize;
  19. }
  20. @Bean
  21. @ConditionalOnMissingBean(MemcachedClient.class)
  22. public MemcachedClient createMemcachedClient() throws IOException {
  23. return MemcachedClientBuilder2.build(serverList, connPoolSize);
  24. }
  25. }

如上所示,上述自动配置会扫描配置文件:

  • 若发现”memcached”开头的配置,会尝试解析其serverList和connPoolSize字段。
  • 若解析成功,会调用之前介绍的Builder,自动生成一个MemcachedClient。

Memcached的应用案例

我们通过一个简单的案例来说明MemcachedCache的使用。

设计一个接口,返回10秒内每个用户的第一次访问的时间戳。

我们通过MemcachedCache来实现,首先定义Cache:

  1. @Service
  2. public class TimestampMemcachedCache extends AbstractMemcachedCache<Integer, Long> {
  3. @Autowired
  4. private MemcachedClient client;
  5. private CacheKeyTransformer<Integer> keyTransformer = new DefaultCacheKeyTransformer<>("timestamp");
  6. private CacheValueTransformer<Long> valueTransformer = new LongValueTransformer();
  7. @Override
  8. protected MemcachedClient getMemcachedClient() {
  9. return client;
  10. }
  11. @Override
  12. protected CacheKeyTransformer<Integer> getKeyTransformer() {
  13. return keyTransformer;
  14. }
  15. @Override
  16. protected CacheValueTransformer<Long> getValueTransformer() {
  17. return valueTransformer;
  18. }
  19. }

如上所示,我们定义了类型的Cache,其中Integer的Key表示用户Id,Long类型的Value表示时间戳。

上述的MemcachedClient是自动注入的,我们需要做一下配置:

  1. memcached.serverList: "127.0.0.1:11211"

在使用的Service中,如下使用:

  1. @Autowired
  2. private TimestampMemcachedCache cache;
  3. @Override
  4. public String getCacheTimestampByUserId(int usrId) {
  5. return String.valueOf(cache.cacheGet(userId, key -> System.currentTimeMillis(), 10));
  6. }

如上所示,我们用了cacheGet来分别缓存最新时间戳,过期时间设为10秒钟。

通过这个例子,你一定体会到了:有了ICache等封装后,Memcached的使用变得非常简单。

小结

在本节中,我们设计了通过了ICache接口,并实现了LocalCache、MemcachedCache两种不同的Cache。其中,我们重点探讨了MemcachedClient实现的细节,包括MemcachedClient的自动注入、Memcached数据类型的转换(Transfomer)。

拓展阅读

  1. 实际的应用中,缓存的更新是一个较为复杂的任务,建议阅读缓存更新的套路