读书人

施用mina来开发socket程序

发布时间: 2012-07-22 19:31:17 作者: rapoo

使用mina来开发socket程序

mina早于netty,出自同一人之手。个人感觉netty更棒但项目老大要求使用mina,所以就学习一下mina啦。学习的成果总结如下。

?

使用mina开发socket只需要IoAcceptor、IoHandlerAdapter、NioSocketConnector、ProtocolCodecFactory等几个类基本上就可以进行开发了。

?

首先一个Server(简单实例并非完整代码) 负责Mina服务端的启停

?

/** * @author shenbaise(shenbaise1001@126.com) * @date 2012-2-10 * TODO Monitor Server2 将Mina交给spring管理,可以通过web界面来管理Mina的起停,参数设置等等 */@Controller("monitorServer2")public class MonitorServer2 {private static final int PORT = Constant.remote_port; //定义监听端口 private IoAcceptor acceptor = null;private InetSocketAddress socketAddres = null;@AutowiredMonitorServerHandler monitorServerHandler;/** * 启动Server * @throws IOException */@RequestMapping("start")public void init() throws IOException{if(null == acceptor)//如果 上一个Acceptor没有被关闭,则新创建的Acceptor无法被绑定,同时上一个Acceptor将无法再被关闭。acceptor = new NioSocketAcceptor();socketAddres = new InetSocketAddress(Constant.remote_address,PORT);        // Add two filters : a logger and a codec        acceptor.getFilterChain().addLast( "logger", new LoggingFilter() );        acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter( new TextLineCodecFactory( Charset.forName( "UTF-8" ))));           // Attach the business logic to the server//        acceptor.setHandler( monitorServerHandler );        acceptor.setHandler(new MonitorServerHandler());        // Configurate the buffer size and the iddle time        acceptor.getSessionConfig().setReadBufferSize( 2048 );        acceptor.getSessionConfig().setIdleTime( IdleStatus.BOTH_IDLE, 10 );        acceptor.getSessionConfig().setUseReadOperation(true);         // And bind !        acceptor.bind(socketAddres);}/** * 销毁并退出 */@RequestMapping("stop")public void destroy(){if (null!=acceptor) {acceptor.unbind(socketAddres);acceptor.getFilterChain().clear();// 清空Filter chain,防止下次重新启动时出现重名错误acceptor.dispose();// 可以另写一个类存储IoAccept,通过spring来创建,这样调用dispose后也会重新创建一个新的。或者可以在init方法内部进行创建。acceptor = null;//System.exit(0);将导致容器停止}}    public static void main(String[] args) throws IOException {    MonitorServer2 server = new MonitorServer2();    server.init();        try {Thread.sleep(600000);server.destroy();} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}    }}
?

一个对应的Handler,负责业务逻辑处理

?

@Service("monitorServerHandler")public class MonitorServerHandler extends IoHandlerAdapter {@SuppressWarnings("unused")@Autowiredprivate MCacheClient cache;private int count = 0;public static final String ALERT_MSG_CACHE_KEY = "1457261687388459164";@Overridepublic void exceptionCaught(IoSession session, Throwable cause)throws Exception {cause.printStackTrace();super.exceptionCaught(session, cause);}/** * 将接收到的消息存库或者缓存 */@Overridepublic void messageReceived(IoSession session, Object message)throws Exception {/*服务端的逻辑一般都在这里写*/}@Overridepublic void messageSent(IoSession session, Object message) throws Exception {System.out.println("SERVER=>messageSent:" + (String)message);super.messageSent(session, message);}@Overridepublic void sessionClosed(IoSession session) throws Exception {System.out.println("SERVER=>sessionClosed: current sessionId:"+session.getId());super.sessionClosed(session);}@Overridepublic void sessionCreated(IoSession session) throws Exception {System.out.println("SERVER=>sessionCreated: current sessionId:"+session.getId());super.sessionCreated(session);}@Overridepublic void sessionIdle(IoSession session, IdleStatus status)throws Exception {System.out.println("SERVER=>sessionIdle:" + session.getIdleCount( status ));super.sessionIdle(session, status);}@Overridepublic void sessionOpened(IoSession session) throws Exception {System.out.println("SERVER=>sessionOpened: current sessionId:"+session.getId());super.sessionOpened(session);}}
?

对应Client端也是类似的,一个类负责连接Server,一个来负责业务逻辑的处理。

?

Client端:

?

public class MonitorClient {private static final int PORT = 10000;public  static String address = "127.0.0.1";private static InetSocketAddress socketAddres = new InetSocketAddress(address,PORT);private NioSocketConnector connector = null; /** * 启动(在listener中启动是需要新建一个线程来连接Server,否则web容器会阻塞而无法启动。) */public void init(){connector = new NioSocketConnector();connector.getFilterChain().addLast( "logger", new LoggingFilter() ); connector.getFilterChain().addLast( "codec", new ProtocolCodecFilter( new MyCodeFactory( Charset.forName( "UTF-8" )))); //设置编码过滤器 connector.setConnectTimeoutMillis(3000); connector.setHandler(new MonitorClientHandler());//设置事件处理器 ConnectFuture cf = connector.connect(socketAddres);//建立连接 cf.awaitUninterruptibly();cf.getSession().getCloseFuture().awaitUninterruptibly();//等待连接断开 }/** * 销毁 */public void destroy(){socketAddres = null;connector.dispose();}public static void main(String[] args) { //MonitorClient client = new MonitorClient();//client.init();//try {//Thread.sleep(70000);////client.destroy();//} catch (InterruptedException e) {//e.printStackTrace();//}ClientThread thread = new ClientThread();Thread cThread = new Thread(thread);cThread.start();}/** * 启动(在新建线程中连接Server) */public void init2(){ClientThread thread = new ClientThread();Thread cThread = new Thread(thread);cThread.start();}}

?Client端的Handler:

?

public class MonitorClientHandler extends IoHandlerAdapter {int count = 0;StringBuilder sb = new StringBuilder();int pos = 0;//@Autowired BaseDao baseDao;@Overridepublic void exceptionCaught(IoSession session, Throwable cause)throws Exception {cause.printStackTrace();super.exceptionCaught(session, cause);}@Overridepublic void messageReceived(IoSession session, Object message)throws Exception {/*Client端的逻辑会在这里*/super.messageReceived(session, message);}@Overridepublic void messageSent(IoSession session, Object message) throws Exception {                 /*或者会在这里,当然每个地方都会有一些东西需要处理,例如创建、关闭、空闲等等*/super.messageSent(session, message);}@Overridepublic void sessionClosed(IoSession session) throws Exception {/*这里可以实现重连,但是这会涉及到你代码的一些资源分配或调度逻辑,跑一个线程进行重连*/super.sessionClosed(session);}@Overridepublic void sessionCreated(IoSession session) throws Exception {System.out.println("CLIENT=>sessionCreated: current sessionId:"+session.getId());super.sessionCreated(session);}@Overridepublic void sessionIdle(IoSession session, IdleStatus status)throws Exception {         System.out.println("CLIENT=>sessionIdle:" + session.getIdleCount( status ));super.sessionIdle(session, status);}@Overridepublic void sessionOpened(IoSession session) throws Exception {System.out.println("CLIENT=>sessionOpened: current sessionId:"+session.getId());super.sessionOpened(session);}}

?最后还有ProtocolCodecFactory也是比较重要的

一个很简单的实现如下:

?

public  class MyCodeFactory implements ProtocolCodecFactory {private LineDelimiter enLineDelimiter = new LineDelimiter(Constant.CHAR2);    private final TextLineEncoder encoder;    private final TextLineDecoder decoder;    /*final static char endchar = 0x1a;*/    final static String endchar = Constant.CHAR2;        public MyCodeFactory() {        this(Charset.forName("gb2312"));    }        public MyCodeFactory(Charset charset) {     encoder = new TextLineEncoder(charset, enLineDelimiter);         decoder = new TextLineDecoder(charset, enLineDelimiter);                  }public ProtocolDecoder getDecoder(IoSession session) throws Exception {// TODO Auto-generated method stubreturn decoder;}public ProtocolEncoder getEncoder(IoSession session) throws Exception {// TODO Auto-generated method stubreturn encoder;}public int getEncoderMaxLineLength() {        return encoder.getMaxLineLength();    }    public void setEncoderMaxLineLength(int maxLineLength) {        encoder.setMaxLineLength(maxLineLength);    }    public int getDecoderMaxLineLength() {        return decoder.getMaxLineLength();    }    public void setDecoderMaxLineLength(int maxLineLength) {        decoder.setMaxLineLength(maxLineLength);    }}

?mina的TextLineCodecFactory默认以换行来进行分隔,我们也可以自定义。以TextLineCodecFactory为例,mina在发送是会在发送内容之后自动加一个换行符,在接收时会按换行符来截取收到的内容。

?

在web工程中启动mina客户端时应该新开一个线程,不要用主线程。

读书人网 >软件架构设计

热点推荐