Xmemcached使用【一】
Xmemcached的Wiki:http://code.google.com/p/xmemcached/wiki/User_Guide_zh#与Hibernate-memcached集成
一、软件环境
???? 1、memcached-1.4.5
???? 2、xmemcached-1.4.1
二、简单示例
??? 1、设置节点权重
??????? 如果需要编程设置,通过下面代码:
?MemcachedClientBuilder builder = new????
?XMemcachedClientBuilder(AddrUtil.getAddresses("localhost:12000 localhost:12001"),new int[]{1,3});
?MemcachedClient memcachedClient=builder.build();
传入一个int数组,里面的元素就是节点对应的权重值,比如这里设置"localhost:12000"节点的权重为1,而"localhost:12001"的权重为3。注意,xmemcached的权重是通过复制连接的多个引用来实现的,比如权重为3,那么就复制3个同一个连接的引用放在集合中让MemcachedSessionLocator查找。
?
改变节点权重,可以通过setServerWeight方法:
public interface XMemcachedClientMBean{
???????????? ....
???????? /**
???????? * Set a memcached server's weight
???????? *
???????? * @param server
???????? * @param weight
???????? */
??????? public void setServerWeight(String server, int weight);
?? }
??? 2、使用二进制协议
?????? Memcached 1.4开始正式启用二进制协议,xmemcached 1.2开始支持二进制协议,启用这一特性也非常简单,设置相应的CommandFactory即可:
??? MemcachedClientBuilder builder = new??? XMemcachedClientBuilder(AddrUtil.getAddresses("localhost:12000 localhost:12001"),new int[]{1,3});
? builder.setCommandFactory(new BinaryCommandFactory());//use binary protocol
? MemcachedClient memcachedClient=builder.build();
??? 3、动态添加/删除节点
?????? MemcachedClient client=new XMemcachedClient(AddrUtil.getAddresses("server1:11211 server2:11211"));
?? //Add two new memcached nodes
?? client.addServer("server3:11211 server4:11211");
?? //Remove memcached servers
?? client.removeServer("server1:11211 server2:11211");
??? 4、NIO连接池
??????? Xmemcached是基于java nio的client实现,默认对一个memcached节点只有一个连接,这在通常情况下已经有非常优异的表现。但是在典型的高并发环境下,nio的? 单连接也会遇到性能瓶颈。因此XMemcached支持设置nio的连接池,允许建立多个连接到同一个memcached节点,但是请注意,这些连接之间是不同步的,因此你的应用需要自己保证数据更新的同步,启用连接池可以通过下面代码:
MemcachedClientBuilder builder = new??? XMemcachedClientBuilder(AddrUtil.getAddresses("localhost:12000"));
builder.setConnectionPoolSize(5);
??? 5、Failure模式和standby节点
从1.3版本开始,xmemcached支持failure模式。所谓failure模式是指,当一个memcached节点down掉的时候,发往这个节点的请求将直接失败,而不是发送给下一个有效的memcached节点。具体可以看memcached的文档。默认不启用failure模式,启用failure模式可以通过下列代码:MemcachedClientBuilder builder=……; builder.setFailureMode(true);
不仅如此,xmemcached还支持主辅模式,你可以设置一个memcached的节点的备份节点,当主节点down掉的情况下,会将本来应该发往主节点的请求转发给standby备份节点。使用备份节点的前提是启用failure模式。备份节点设置如下:MemcachedClient builder=new XmemcachedClientBuilder(AddrUtil.getAddressMap("localhost:11211,localhost:11212 host2:11211,host2:11212"));
?
??? 6、与Kestrel交互
?????? Kestrel是twitter开源的一个scala写的简单高效MQ,它支持 memcached文本协议,但是并不完全兼容,例如它不支持flag,导致很多利用flag做序列化的客户端无法正常运作。因此Xmemcached特意提供了KestrelCommandFactory?用于支持Kestrel。使用KestrelCommandFactory?即可拥有如下好处:默认关闭get优化,因为kestrel不支持bulk get;支持kestrel的阻塞获取和可靠获取;允许向kestrel存储任意java序列化类型。设置KestrelCommandFactory:
MemcachedClientBuilder builder = new??? XMemcachedClientBuilder(AddrUtil.getAddresses("localhost:12000 localhost:12001"),new int[]{1,3});
builder.setCommandFactory(new KestrelCommandFactory());
MemcachedClient memcachedClient=builder.build();
?
关于最后一点需要补充说明,由于kestrel不支持flag,因此xmemcached在存储的数据之前加了4个字节的flag,如果你的全部应用都使用xmemcached,那么没有问题,如果使用其他clients,会有兼容性的问题,因此Xmemcached还允许关闭这个功能,通过client.setPrimitiveAsString(true);设置为true后,原生类型都将存储为字符串,而序列化类型将无法存储了。
?
??? 7、与tokyotyrant交互
?????? 通过使用TokyoTyrantTranscoder就可以跟TokyoTyrant进行交互,但是由于TokyoTyrant对memcached文本协议的flag,exptime不支持,因此内部TokyoTyrantTranscoder加了4个字节的flag在value前面,如果你的全部应用都使用xmemcached,那么没有问题,如果使用其他clients,会有兼容性的问题,这一点与跟kestrel交互相同。
MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil.getAddresses("localhost:12000 localhost:12001"),new int[]{1,3});
builder.setTranscoder(new TokyoTyrantTranscoder());
MemcachedClient memcachedClient=builder.build();
??? 8、数据压缩
??????? memcached存储大数据的效率是比较低的,当数据比较大的时候xmemcached会帮你压缩在存储,取出来的时候自动解压并反序列化,这个大小阀值默认是16K,可以通过Transcoder接口的setCompressionThreshold(1.2.1引入)方法修改阀值,比如设置为1K:memcachedClient.getTranscoder()).setCompressionThreshold(1024);
??? 9、packZeros
???????? XMemcached的序列化转换器在序列化数值类型的时候有个特殊处理,如果前面N个字节都是0,那么将会去除这些0,缩减后的数据将更小,例如数字3序列化是0x0003,那么前面3个0将去除掉成一个字节0x3。反序列化的时候将自动在前面根据数值类型补0。这一特性默认是开启的,如果考虑到与其他client兼容的话需要关闭此特性可以通过:memcachedClient.getTranscoder()).setPackZeros(false);
??? 10、sanitizeKey????
???????? 在官方客户端有提供一个sanitizeKeys选项,当选择用URL当key的时候,MemcachedClient会自动将URL encode再存储。默认是关闭的,想启用可以通过:???
???????? memcachedClient.setSanitizeKeys(true);
?
配置选项参数表:
属性名
值
servers
memcached节点列表,形如“主节点1:port,备份节点1:port 主节点2:port,备份节点2:port“的字符串,可以不设置备份节点,主备节点逗号隔开,不同分组空格隔开。
weights
与servers对应的节点的权重
authInfoMap
授权验证信息,仅在xmemcached 1.2.5及以上版本有效
connectionPoolSize
nio连接池大小,默认为1
commandFactory
协议工厂,net.rubyeye.xmemcached.command.BinaryCommandFactory,TextCommandFactory(默认),KestrelCommandFactory
sessionLocator
分布策略,一致性哈希net.rubyeye.xmemcached.impl.KetamaMemcachedSessionLocator或者ArraySessionLocator(默认)
?
transcoder
序列化转换器,默认使用net.rubyeye.xmemcached.transcoders.SerializingTranscoder,更多选项参见javadoc
bufferAllocator
IoBuffer分配器,默认为net.rubyeye.xmemcached.buffer.SimpleBufferAllocator,可选CachedBufferAllocator(不推荐)
failureMode
是否启用failure模式,true为启用,默认不启用
?完整代码示例
memcached.properties
#127.0.0.1:11211 127.0.0.1:11212 addressList=127.0.0.1:11211 127.0.0.1:11212 127.0.0.1:11213 127.0.0.1:11214weights=1,2,3,4#1-30connectionPoolSize=15failureMode=true#----------------------------------------##连接池大小即客户端个数memcached.connectionPoolSize=5memcached.failureMode=true#server1memcached.server1.host=127.0.0.1memcached.server1.port=11211memcached.server1.weight=4#server2memcached.server2.host=127.0.0.1memcached.server2.port=11212memcached.server2.weight=3#server3memcached.server3.host=127.0.0.1memcached.server3.port=11213memcached.server3.weight=2#server4memcached.server4.host=127.0.0.1memcached.server4.port=11214memcached.server4.weight=1
如果配置主备,需要修改?addressList
addressList=127.0.0.1:11211,127.0.0.1:11212 127.0.0.1:11213,127.0.0.1:11214
?
XmemcachedUtil.java
package com.wy.util;import java.io.IOException;import java.util.Properties;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.google.code.yanf4j.util.ResourcesUtils;import net.rubyeye.xmemcached.MemcachedClient;import net.rubyeye.xmemcached.MemcachedClientBuilder;import net.rubyeye.xmemcached.XMemcachedClientBuilder;import net.rubyeye.xmemcached.auth.AuthInfo;import net.rubyeye.xmemcached.command.BinaryCommandFactory;import net.rubyeye.xmemcached.impl.ElectionMemcachedSessionLocator;import net.rubyeye.xmemcached.utils.AddrUtil;/** * Xmemcached工具类 * * @author wy * @serial 2013-08-12 */public class XmemcachedUtil {private static Logger log = LoggerFactory.getLogger(XmemcachedUtil.class);private static MemcachedClientBuilder builder = null;private static MemcachedClient memcacheddClient = null;private static Properties prop = null;static {try {prop = ResourcesUtils.getResourceAsProperties("memcached.properties");} catch (IOException e) {e.printStackTrace();log.error("请配置memcached服务地址!");}if(prop != null){String addrs = prop.getProperty("addressList");String weight = prop.getProperty("weights");if(weight != null){String[] weightsArray = weight.split(",");int len = weightsArray.length;int[] weights = new int[len];for(int i=0;i<len;i++){weights[i] = Integer.parseInt(weightsArray[i]);}//builder = new XMemcachedClientBuilder(AddrUtil.getAddresses(addrs), weights);//standby 主备模式builder = new XMemcachedClientBuilder(AddrUtil.getAddressMap(addrs), weights);}else{builder = new XMemcachedClientBuilder(AddrUtil.getAddressMap(addrs));}}else{log.error("请配置memcached服务地址!");}//设置连接池大小,即客户端个数//xmemcached是基于nio的,通常不要设置连接池,一个连接可以支撑绝大多数应用。//除非你的应用并发规模特别大,比如我压测的时候测试100个线程或者300个线程并发的时候,连接池才能显示出优势。builder.setConnectionPoolSize(Integer.valueOf(prop.getProperty("connectionPoolSize")));//宕机报警builder.setFailureMode(Boolean.valueOf(prop.getProperty("failureMode")));//使用二进制协议,默认使用的TextCommandFactory即文本协议builder.setCommandFactory(new BinaryCommandFactory());//1.Standard Hash, hash(key) mod server_count (余数分布)//默认使用余数分布//2.Consistent Hash(一致性哈希)//builder.setSessionLocator(new KetamaMemcachedSessionLocator());//3.Election Hash(选举散列)builder.setSessionLocator(new ElectionMemcachedSessionLocator());//SASL验证, 客户端授权验证授权验证,仅支持二进制协议builder.addAuthInfo(AddrUtil.getOneAddress("localhost:11213"), AuthInfo.typical("wy", "wy"));}/** * 获取MemcachedClient * * @return */public static MemcachedClient getMemcachedClient(){try {if(memcacheddClient == null){memcacheddClient = builder.build();//当选择用URL当key的时候,MemcachedClient会自动将URL encode再存储。默认是关闭的memcacheddClient.setSanitizeKeys(true);//memcached存储大数据的效率是比较低的,当数据比较大的时候xmemcached会帮你压缩在存储,取出来的时候自动解压并反序列化,这个大小阀值默认是16KmemcacheddClient.getTranscoder().setCompressionThreshold(32768);return memcacheddClient;}} catch (IOException e) {e.printStackTrace();log.error("获取MemcachedClient失败!");}return null;}/** * 关闭MemcachedClient * * @param memcachedClient */public static void close(MemcachedClient memcachedClient){if(memcachedClient != null && !memcachedClient.isShutdown()){try {memcachedClient.shutdown();} catch (IOException e) {e.printStackTrace();}}}}?XmemcachedTest.java
package com.wy;import java.net.URL;import java.util.concurrent.TimeoutException;import net.rubyeye.xmemcached.GetsResponse;import net.rubyeye.xmemcached.KeyIterator;import net.rubyeye.xmemcached.MemcachedClient;import net.rubyeye.xmemcached.exception.MemcachedException;import net.rubyeye.xmemcached.utils.AddrUtil;import org.junit.After;import org.junit.Before;import org.junit.Test;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.wy.util.XmemcachedUtil;/** * * * @author wy * */public class XmemcachedTest {private static Logger log = LoggerFactory.getLogger(XmemcachedTest.class);private MemcachedClient memcachedClient = null;@Beforepublic void setUp() throws Exception {long start1 = System.currentTimeMillis();memcachedClient = XmemcachedUtil.getMemcachedClient();long end1 = System.currentTimeMillis();System.out.println("创建MemcachedClient消耗时间:"+(end1 - start1)); } @After public void tearDown() throws Exception { XmemcachedUtil.close(memcachedClient); } @Testpublic void main() {try {memcachedClient.flushAll();long start2 = System.currentTimeMillis();memcachedClient.set("wy", 3600, "wy test");long end2 = System.currentTimeMillis();System.out.println("向memcached中存值消耗时间:"+(end2 - start2));long start3 = System.currentTimeMillis();System.out.println(memcachedClient.get("wy").toString());long end3 = System.currentTimeMillis();System.out.println("从memcached中取值消耗时间:"+(end3 - start3));System.out.println(memcachedClient.add("wy123", 3600, "123") +" "+ memcachedClient.get("wy123").toString());memcachedClient.append("wy", "_123");System.out.println(memcachedClient.get("wy").toString());memcachedClient.replace("wy", 3600, "wy_123");System.out.println(memcachedClient.get("wy"));/** * Memcached是通过cas协议实现原子更新,所谓原子更新就是compare and set, * 原理类似乐观锁,每次请求存储某个数据同时要附带一个cas值, memcached比对这个cas值与当前存储数据的cas值是否相等, * 如果相等就让新的数据覆盖老的数据,如果不相等就认为更新失败, 这在并发环境下特别有用 */GetsResponse<Integer> result = memcachedClient.get("wy");long cas = result.getCas();//将wy的值修改为casif(!memcachedClient.cas("wy", 3600, "cas", cas)){System.err.println("cas error");}memcachedClient.set("http://www.baidu.com", 3600, "http://www.baidu.com");KeyIterator iterator = memcachedClient.getKeyIterator(AddrUtil.getOneAddress("127.0.0.1:11213"));while(iterator.hasNext()){ String key = iterator.next(); System.out.println("key = "+key);}System.out.println("value = " + memcachedClient.get("wy"));System.out.println("value = " + memcachedClient.get("http://www.baidu.com"));//memcachedClient.delete("wy");//System.out.println(memcachedClient.get("wy").toString());} catch (TimeoutException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();} catch (MemcachedException e) {e.printStackTrace();}} }?
?
三、与Spring框架集成????
? ? applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"xmlns:p="http://www.springframework.org/schema/p"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"><context:property-placeholder location="memcached.properties" /><bean id="memcachedClientBuilder" p:failureMode="${memcached.failureMode}"><!-- XMemcachedClientBuilder have two arguments.First is server list,and second is weights array. --><constructor-arg><list><bean /></property><!-- 分布策略 --><property name="sessionLocator"><bean /></property><!-- 序列化转换器 --><property name="transcoder"><bean /></property></bean><!-- Use factory bean to build memcached client --><bean id="memcachedClient" factory-bean="memcachedClientBuilder"factory-method="build" destroy-method="shutdown" /></beans>?MemcachedSpringTest.java
package com.wy;import static junit.framework.Assert.*;import java.util.concurrent.TimeoutException;import net.rubyeye.xmemcached.MemcachedClient;import net.rubyeye.xmemcached.exception.MemcachedException;import org.junit.Before;import org.junit.Test;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;public class MemcachedSpringTest {private ApplicationContext app;private MemcachedClient memcachedClient;@Beforepublic void init() {app = new ClassPathXmlApplicationContext("applicationContext.xml");memcachedClient = (MemcachedClient) app.getBean("memcachedClient");}@Testpublic void test() {try {// 设置/获取memcachedClient.set("wy", 36000, "set/get");assertEquals("set/get", memcachedClient.get("wy"));// 替换memcachedClient.replace("wy", 36000, "replace");assertEquals("replace", memcachedClient.get("wy"));// 移除memcachedClient.delete("wy");assertNull(memcachedClient.get("wy"));} catch (TimeoutException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();} catch (MemcachedException e) {e.printStackTrace();}}}?
?
四、SASL验证
???
?参考http://lguan.iteye.com/blog/1279537
?
?
?
?
?