spring redis源码分析 以及 代码漏洞
spring-data-redis提供了redis操作的封装和实现;RedisTemplate模板类封装了redis连接池管理的逻辑,业务代码无须关心获取,释放连接逻辑;spring redis同时支持了Jedis,Jredis,rjc 客户端操作;
?
spring redis 源码设计逻辑可以分为以下几个方面:
?
- Redis连接管理:封装了Jedis,Jredis,Rjc等不同redis 客户端连接Redis操作封装:value,list,set,sortset,hash划分为不同操作Redis序列化:能够以插件的形式配置想要的序列化实现Redis操作模板化: redis操作过程分为:获取连接,业务操作,释放连接;模板方法使得业务代码只需要关心业务操作Redis事务模块:在同一个回话中,采用同一个redis连接完成

?spring redis连接管理模块分析
spring redis封装了不同redis 客户端,对于底层redis客户端的抽象分装,使其能够支持不同的客户端;连接管理模块的类大概有以下:
?
?
public <T> T execute(SessionCallback<T> session) {RedisConnectionFactory factory = getConnectionFactory();// bind connectionRedisConnectionUtils.bindConnection(factory);try {return session.execute(this);} finally {RedisConnectionUtils.unbindConnection(factory);}}?
?
该方法通过RedisConnectionUtils.bindConnection操作将连接绑定到当前线程,批量方法执行时,获取ThreadLocal中的连接;
执行结束时,调用RedisConnectionUtils.unbindConnection释放当前线程的连接
?
SessionCallback接口方法:
?
?
public interface SessionCallback<T> {/** * Executes all the given operations inside the same session. * * @param operations Redis operations * @return return value */<K, V> T execute(RedisOperations<K, V> operations) throws DataAccessException;}??
批量执行RedisOperation时,通过RedisTemplate的方法执行,代码如下:
?
?
public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline) {Assert.notNull(action, "Callback object must not be null");RedisConnectionFactory factory = getConnectionFactory();RedisConnection conn = RedisConnectionUtils.getConnection(factory);boolean existingConnection = TransactionSynchronizationManager.hasResource(factory);preProcessConnection(conn, existingConnection);boolean pipelineStatus = conn.isPipelined();if (pipeline && !pipelineStatus) {conn.openPipeline();}try {RedisConnection connToExpose = (exposeConnection ? conn : createRedisConnectionProxy(conn));T result = action.doInRedis(connToExpose);// TODO: any other connection processing?return postProcessResult(result, conn, existingConnection);} finally {try {if (pipeline && !pipelineStatus) {conn.closePipeline();}} finally {RedisConnectionUtils.releaseConnection(conn, factory);}}}??
当前线程中绑定连接时,返回绑定的redis连接;保证同一回话中,采用同一个redis连接;
?
?
Spring redis 一些问题连接未关闭问题当数据反序列化存在问题时,redis服务器会返回一个Err报文:Protocol error,之后redis服务器会关闭该链接(redis protocol中未指明该协议);了解的jedis客户端为例,其仅仅将错误报文转化为JedisDataException抛出,也没有处理最后的关闭报文; ?此时spring中 处理异常时,对于JedisDataException依旧认为连接有效,将其回收到jedispool中;当下个操作获取到该链接时,就会抛出“It seems like server has closed the connection.”异常
?
相关代码:
?
jedis Protocol读取返回信息:
?
?
private Object process(final RedisInputStream is) { try { byte b = is.readByte(); if (b == MINUS_BYTE) { processError(is); } else if (b == ASTERISK_BYTE) { return processMultiBulkReply(is); } else if (b == COLON_BYTE) { return processInteger(is); } else if (b == DOLLAR_BYTE) { return processBulkReply(is); } else if (b == PLUS_BYTE) { return processStatusCodeReply(is); } else { throw new JedisConnectionException("Unknown reply: " + (char) b); } } catch (IOException e) { throw new JedisConnectionException(e); } return null; }?
?当redis 服务器返回错误报文时(以-ERR开头),就转换为JedisDataException异常;
?
?
private void processError(final RedisInputStream is) { String message = is.readLine(); throw new JedisDataException(message); }??
Spring redis的各个RedisConnection实现中转换捕获异常,例如JedisConnection 一个操作:
?
?
public Long dbSize() {try {if (isQueueing()) {throw new UnsupportedOperationException();}if (isPipelined()) {throw new UnsupportedOperationException();}return jedis.dbSize();} catch (Exception ex) {throw convertJedisAccessException(ex);}}?
JedisConnection捕获到异常时,调用convertJedisAccessException方法转换异常;
?
?
protected DataAccessException convertJedisAccessException(Exception ex) {if (ex instanceof JedisException) {// check connection flagif (ex instanceof JedisConnectionException) {broken = true;}return JedisUtils.convertJedisAccessException((JedisException) ex);}if (ex instanceof IOException) {return JedisUtils.convertJedisAccessException((IOException) ex);}return new RedisSystemException("Unknown jedis exception", ex);}?可以看到当捕获的异常为JedisConnectionException 时,才将broken设置为true(在关闭连接时,直接销毁Jedis示例);?JedisDataException仅仅进行了转换;
?
JedisConnection释放连接逻辑:
?
?
public void close() throws DataAccessException {// return the connection to the pooltry {if (pool != null) {if (broken) {pool.returnBrokenResource(jedis);}else {// reset the connection if (dbIndex > 0) {select(0);}pool.returnResource(jedis);}}} catch (Exception ex) {pool.returnBrokenResource(jedis);}if (pool != null) {return;}// else close the connection normallytry {if (isQueueing()) {client.quit();client.disconnect();return;}jedis.quit();jedis.disconnect();} catch (Exception ex) {throw convertJedisAccessException(ex);}}?当JedisConnection实例的broken被设置为true时,就会销毁连接;
到此,可以发现当redis服务器返回Protocol error这个特殊类型的错误消息时,会抛出JedisDataException异常,这是spring不会销毁连接,当该链接再次被使用时,就会抛出“It seems like server has closed the connection.”异常。
?
该问题仅仅在发送不完整redis协议(可能是TCP报文错误,操作序列化错误等等)时,发生;PS:不是错误的redis操作,错误命令不一定回导致错误的报文;
?
并且该错误消息在redis协议中也没有指出,因此jedis也没有做处理;修复此问题,可以在spring redis 或者 jedis 中解决;
- spring jedis解决:可以在convertJedisAccessException方法中检查JedisDataException的消息内容是否包含"Protocol error",若包含设置broken = true,销毁连接jedis解决方案:Protocol.processError中检查 错误消息是否包含"Protocol error";如果包含,可以读取最后被忽略的关闭报文,并转换为JedisConnectionException异常抛出
Protocol error异常可以查看redis源码network.c;当redis接收到客户端的请求报文,都会经过检查,当报文不完整,超长等问题时,将抛出Protocol error异常,并关闭连接;该报文没有在redis protocol中明确指明;可参见:http://redis.io/topics/protocol
?
?
redis分库性能问题与连接池泄露当采用redis分库方案时,spring redis 在每次获取连接时,都需要执行select 操作切换到指定库,性能开销大;
?
redis分库操作逻辑:
?
参见RedisTemplate?execute模板方法,调用RedisConnectionUtils.getConnection(factory)获取连接;最终调用doGetConnection:
?
?
public static RedisConnection doGetConnection(RedisConnectionFactory factory, boolean allowCreate, boolean bind) {Assert.notNull(factory, "No RedisConnectionFactory specified");RedisConnectionHolder connHolder = (RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory);//TODO: investigate tx synchronizationif (connHolder != null)return connHolder.getConnection();if (!allowCreate) {throw new IllegalArgumentException("No connection found and allowCreate = false");}if (log.isDebugEnabled())log.debug("Opening RedisConnection");RedisConnection conn = factory.getConnection();boolean synchronizationActive = TransactionSynchronizationManager.isSynchronizationActive();if (bind || synchronizationActive) {connHolder = new RedisConnectionHolder(conn);if (synchronizationActive) {TransactionSynchronizationManager.registerSynchronization(new RedisConnectionSynchronization(connHolder, factory, true));}TransactionSynchronizationManager.bindResource(factory, connHolder);return connHolder.getConnection();}return conn;}?实际调用的是factory.getConnection方法,参见JedisConnectionFactory:
?
?
public JedisConnection getConnection() {Jedis jedis = fetchJedisConnector();return postProcessConnection((usePool ? new JedisConnection(jedis, pool, dbIndex) : new JedisConnection(jedis,null, dbIndex)));}?
fetchJedisConnector从JedisPool中获取Jedis连接,之后实例化JedisConnection对象:
?
?
public JedisConnection(Jedis jedis, Pool<Jedis> pool, int dbIndex) {this.jedis = jedis;// extract underlying connection for batch operationsclient = (Client) ReflectionUtils.getField(CLIENT_FIELD, jedis);transaction = new Transaction(client);this.pool = pool;this.dbIndex = dbIndex;// select the dbif (dbIndex > 0) {select(dbIndex);}}?可以看到每次都需要重复select操作,这回导致大量的redis 请求,严重影响性能;
?
此外,还存在连接池泄露的问题:
?
?
public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline) {Assert.notNull(action, "Callback object must not be null");RedisConnectionFactory factory = getConnectionFactory();RedisConnection conn = RedisConnectionUtils.getConnection(factory);.....try {......} finally {try {if (pipeline && !pipelineStatus) {conn.closePipeline();}} finally {RedisConnectionUtils.releaseConnection(conn, factory);}}}?
当select操作发生异常时,RedisConnectionUtils.getConnection(factory)抛出异常,此时代码不在try catch块中,这是将无法回收连接,导致连接泄露
?
?
spring redis 设计的一些其他问题:http://ldd600.iteye.com/blog/1115196
?
?