/** * @author zhangshuo */@Slf4jpublic class JedisClusterPipeLine extends PipelineBase implements Closeable {............ private final Queue orderedClients = new LinkedList (); /** 一次pipeline过程中使用到的jedis缓存 */ private final Map poolToJedisMap = new HashMap (); private final JedisSlotBasedConnectionHandler connectionHandler; private final JedisClusterInfoCache clusterInfoCache; public JedisClusterPipeLine(JedisCluster jedisCluster) { this.connectionHandler = ClassUtils.getValue(jedisCluster, SLOT_BASED__CONNECTION_HANDLER_FIELD); this.clusterInfoCache = ClassUtils.getValue(connectionHandler, CLUSTER_INFO_CACHE_FIELD); } @Override protected Client getClient(String key) { return getClient(SafeEncoder.encode(key)); } @Override protected Client getClient(byte[] key) { Client client; log.debug("size of orderedClients : {} , size of poolToJedis : {} ", orderedClients.size(), poolToJedisMap.size()); int slot = JedisClusterCRC16.getSlot(key); JedisPool pool = clusterInfoCache.getSlotPool(slot); Jedis borrowedJedis = poolToJedisMap.get(pool); if (null == borrowedJedis) { borrowedJedis = pool.getResource(); poolToJedisMap.put(pool, borrowedJedis); } client = borrowedJedis.getClient(); orderedClients.add(client); return client; } @Override public void close() { for (Jedis jedis : poolToJedisMap.values()) { jedis.close(); } clean(); orderedClients.clear(); poolToJedisMap.clear(); } public void sync() { for (Client client : orderedClients) { generateResponse(client.getOne()); } } /** * go through all the responses and generate the right response type (warning : * usually it is a waste of time). * * @return A list of all the responses in the order */ public List
性能对比(提升10倍以上):
@Test public void jedisTest() throws UnsupportedEncodingException { long start2 = System.currentTimeMillis(); try (JedisClusterClient jc = jedisClusterClient) { for (int i = 0; i < 100; i++) { jc.set("NO." + i, "value" + i); } } catch (Exception e) { e.printStackTrace(); } System.out.println(System.currentTimeMillis() - start2);// 5688ms } /** * */ @Test public void clusterPipeline() { long start = System.currentTimeMillis(); try (JedisClusterPipeLine pipeline = jedisClusterClient.pipelined()) { for (int i = 0; i < 100; i++) { pipeline.set("NO." + i, "value" + i); } } catch (Exception e) { e.printStackTrace(); } System.out.println(System.currentTimeMillis() - start);// 174ms }}