<实践>Voldemort与Objot集成的WAP高性能方案与简单实现(二)
2、IOC、AOP及请求处理
ServiceHandler.java//负责处理WAP请求分发和初始化存储
public class ServiceHandler {//URI缓存private ConcurrentHashMap<String, ServiceInfo> infos = new ConcurrentHashMap<String, ServiceInfo>(128, 0.8f, 32);private Container container;public void init(StoreRepository repository) throws Exception {//初始化Objot容器container = Services.build(repository);}public String handle(ServletRequest request, ServletResponse response) throws Exception {//解析URIString uri = ((HttpServletRequest)request).getRequestURI();String name = uri.substring(uri.lastIndexOf('/') + 1);ServiceInfo info = getInfo(name);//处理URL参数,只有一个参数名p,即p=a&p=b&p=c...Object[] ps = parse(request.getParameterValues("p"), info.meth.getParameterTypes());String result = (String)info.meth.invoke(container.get(info.cla), ps);//如果有需要填充到页面的数据,则加载ModelMap map = ps.length > 0 && ps[ps.length - 1] instanceof ModelMap ? (ModelMap)ps[ps.length - 1] : null;if(map != null)for(String s : map.keySet())request.setAttribute(s, map.get(s));return result;}//根据URI解析类名和方法名,包在ServiceInfo中private ServiceInfo getInfo(String name) {ServiceInfo inf = infos.get(name);if(inf != null)return inf;try {int x = String2.index(name, "-", 0);inf = getInfo(Class2.packageName(Do.class).concat(".").concat(name.substring(0, x)), String2.sub(name,x + 1));}catch(RequestException e) {throw e;}catch(Exception e) {throw new RequestException("Service not found: ".concat(name), e);}if(inf == null)throw new RequestException("Service not found: ".concat(name));infos.put(name, inf);return inf;}private ServiceInfo getInfo(String cla, String method) throws Exception {Class<?> c = Class.forName(cla);if(Mod2.match(c, Mod2.PUBLIC))for(Method m : c.getMethods())if(m.getName().equals(method))return new ServiceInfo(m);return null;}//解析HTTP参数,对应到DoX的方法private Object[] parse(String[] values, Class<?>[] types) throws Exception {int needN = types.length == 0 ? 0 : types[types.length - 1] == ModelMap.class ? types.length - 1 : types.length;int paramN = values != null ? values.length : 0;if(needN != paramN)throw new Exception("Arguments mismatch");Object[] os = null;if(types.length > 0 && types[types.length - 1] == ModelMap.class) {os = new Object[needN + 1];os[needN] = new ModelMap();}elseos = new Object[needN];for(int i = 0; i < needN; i++)os[i] = parse(values[i], types[i]);return os;}private static Object parse(String str, Class<?> ct) throws Exception {if(ct == int.class)return Integer.parseInt(str);else if(ct == long.class)return Long.parseLong(str);else if(ct == String.class)return str;elsethrow new Exception("Unsupported convertion: java.lang.String -> ".concat(ct.getName()));}}
Services.java//初始化各个服务,编解码器(因为是使用byte[]存储,json或java serial都太慢),使用Objot IOC和AOP(负责打开Voldemort Store)
public class Services {public static Container build(final StoreRepository repository) throws Exception {//初始化编解码器,存储用final Codec codec = new Codec() {String modelPrefix = Class2.packageName(Id.class).concat(".");@Overrideprotected Object byName(String name, Object ruleKey) throws Exception {if(name.length() == 0)return HashMap.class;return Class.forName(modelPrefix.concat(name));}@SuppressWarnings("unchecked")@Overrideprotected String name(Object o, Class<?> c, Object ruleKey) throws Exception {if(o instanceof HashMap)return "";return Class2.selfName(c);}};//初始化版本冲突解决器final VectorClockInconsistencyResolver<byte[]> resolver = new VectorClockInconsistencyResolver<byte[]>();final Weaver w = new Weaver(Transaction.As.class) {@Overrideprotected Object forWeave(Class<? extends Aspect> ac, Method m) throws Exception {if(!m.isAnnotationPresent(Service.class))return this;//是否需要数据操作的AOPif(ac == Transaction.As.class)return m.isAnnotationPresent(Transaction.Any.class) ? this : null;return this;}};final Container sess = new Factory() {{bind(StoreRepository.class);bind(VectorClockInconsistencyResolver.class);bind(Codec.class);}@Overrideprotected Object forBind(Class<?> c, Bind b) throws Exception {return c == StoreRepository.class ? b.obj(repository) : c == VectorClockInconsistencyResolver.class ? b.obj(resolver) : c == Codec.class ? b.obj(codec) : b;}}.create(null);Factory req = new Factory() {@Overrideprotected Object forBind(Class<?> c, Bind b) throws Exception {if(sess.bound(c))return b.mode(Inject.Parent.class);if(c.isSynthetic())return b;return b.cla(w.weave(c));}};for(Class<?> c : Class2.packageClasses(Do.class))if(Mod2.match(c, Mod2.PUBLIC, Mod2.ABSTRACT))req.bind(c);return req.create(sess, true);}}
Transaction.java //Any表示不需要数据操作
public @interface Transaction {@Retention(RetentionPolicy.RUNTIME)@Target(ElementType.METHOD)public @interface Any {}//数据操作Aspectpublic static class As extends Aspect {@Injectpublic StoreRepository repository;@Injectpublic Data data;@Overrideprotected void aspect() throws Throwable {if(data.store == null)data.store = repository.getRoutedStore("vortex");Target.invoke();}}}
Do.java //所有服务的父类
public class Do {@Injectpublic Data data;@Retention(RetentionPolicy.RUNTIME)@Inheritedpublic @interface Service {}}
Data.java //Voldemort数据操作类,以VectorClock方式解决版本问题
public class Data {public Store<ByteArray, byte[]> store;@Injectpublic VectorClockInconsistencyResolver<byte[]> resolver;@Injectpublic Codec codec;public <T extends Id<T>> void put(T model) throws Exception {//以id来判定是新加还是更改if(model.id <= 0) {model.id = nextId(model.getClass());}ByteArray key = new ByteArray(model.getClass().getSimpleName().concat("::" + model.id).getBytes());byte[] value = String2.utf(codec.enc(model, null));Versioned<byte[]> v = fix(store.get(key));if(v == null)v = new Versioned<byte[]>(value);elsev.setObject(value);store.put(key, v);}public <T extends Id<T>> T get(Class<T> c, long id) throws Exception {ByteArray key = new ByteArray(c.getSimpleName().concat("::" + id).getBytes());Versioned<byte[]> v = fix(store.get(key));return codec.dec(String2.utf(v.getValue()), c, null);}//取得Model的id下一个值,自增长方式,多节点会有问题!!!public long nextId(Class<?> c) {ByteArray counterKey = new ByteArray(c.getSimpleName().concat("-Counter").getBytes());long nextId = 1L;byte[] id = new byte[8];Versioned<byte[]> fix = fix(store.get(counterKey));if(fix == null) {Bytes.writeS8(id, 0, nextId);fix = new Versioned<byte[]>(id);}else {nextId = Bytes.readU8(fix.getValue(), 0) + 1;Bytes.writeS8(id, 0, nextId);fix.setObject(id);}store.put(counterKey, fix);return nextId;}//解决版本冲突private Versioned<byte[]> fix(List<Versioned<byte[]>> vs) {if(vs == null || vs.isEmpty())return null;return resolver.resolveConflicts(vs).get(0);}}