棘手的Socket问题,涉及到多线程。请大家帮忙!
为了把问题说明清楚,把我写的代码贴出来(共三部分),请大家为我解答:
一个是Socket的服务端(用Java编写的),另一个是Socket的客户端(这个Socket客户端C++写的)。Socket的服务端可以向Socket的客户端发送信令,这个大家都知道,但现在Socket的服务端必须要抛出一个多线程便于一直接收Socket客户端发过来的信令,不然若客户端有什么请求,我就不知道了.
注:客户端必须先向服务端链接,这样服务端才得到一个链接(Socket).
下面详细说明一下:
服务器先起来,然后客户端向服务器端建立一个连接,服务器端收到连接后,用一个全局变量保存起来;
在这里保存起来有两个目的:1.让服务端保持这个链接,不需要以后重新再链接,2.可以在其它方法里(比如C部分)利用这个保存的Socket向客户端发信。
各个部分的含义:
A部分,服务器端收到客户端的连接,并将这个链接(socket)保存起来,注意,服务器端收到连接后并不是立即给客户端发送信令,具体向客户端发信的是通过C部分实现的;然后new SocketServerThread(socket).start(),便于可以一直(随时)接收客户端的信令。
B部分,接收客户端的信令。因为要可以随时接收不同客户端的信令(业务要求),所以要用到多线程。当通过C部分向客户端发送信令后(送信的内容是xml体,里面包含sessionid等其它数据),那么只能(因为要随时接收不同客户端的信令,所以用一个while循环)通过这个多线程接收了。先发送,再等待返回。那当多线程收到客户端信令的返回时,怎么通知C部分(已经收到返回了)呢?所以就在多线程里用一个全局Map把这个结果保存起来,便于C部分可以通过轮询这个变量,已得知客户端已返回。那怎么确定信息是C的哪次发信返回的结果呢?就是通过sessionid的值来确定的(下面有说明),即我每次在送信前记住这个sessionid,在收信时就找对应的结果(结果也是一个xml体)。每次送信的sessionid都不相同。这是一个瓶颈,尤其在多线程里,这正是我提问题的原因所在。
C部分,利用全局Map保存的socket向客户端发送信令;轮询这个变量,通过sessionid查找对应的结果,看多线程(B部分)是否收到客户端信令的返回。
这是一个会议系统,只要创建会议(是通过用户在页面点击来调用C部分的方法来发起通信的),都会送信。
这是每次送信的内容,是一个xml体,即:
- Java code
<iabi> <cmd id= "3000 " sessionid= "1412412341 "> <confid> 88888888 </confid> </cmd> </iabi>
A部分:
- Java code
public class SocketServer extends Thread { private Logger log= Logger.getLogger(SocketServer.class.getName()); private ServerSocket serverSocket; private int listenPort =8188; public SocketServer(int listenPort) { this.listenPort = listenPort; } public static Map socketMap=new HashMap();//保存socket的HashMap public static Map resultMap=new HashMap();//保存发出通信的的返回结果Map private Socket socket; private boolean isLoop= false; public void run(){ isLoop=true; try { serverSocket= new ServerSocket(listenPort); log.info("Starting listen......"); String ccsIp=""; CcsInfo ccsInfo=null; while(isLoop){ socket = serverSocket.accept(); log.info("Listening->RemoteSocketAddress:"+socket.getRemoteSocketAddress()+" InetAddress:"+socket.getInetAddress()); ccsIp=socket.getRemoteSocketAddress().toString().trim(); ccsIp=ccsIp.substring(1,ccsIp.indexOf(":")); log.info("RemoteSocketIp:"+ccsIp); socketMap.put(ccsIp,socket); log.info("Having listen the IP:" +ccsIp+" The ccsID:"+ccsInfo.getCcsId()); new SocketServerThread(socket).start(); } } catch (Exception e) { isLoop = false; log.error(" "+e); } } }B部分:
- Java code
public class SocketServerThread extends Thread { private Logger log= Logger.getLogger(SocketServerThread.class.getName()); private Socket socket; private int result; private boolean isLoop; ByteArrayOutputStream buf = new ByteArrayOutputStream(); public SocketServerThread(Socket s){ this.socket = s; } public void run(){ isLoop=true; result=0; InputStream in=null; try { in =new BufferedInputStream(socket.getInputStream()); } catch (IOException e){ log.error("The CCS "+socket.getRemoteSocketAddress()+" is unusual:"+e); } int readLen=1024; int count=0; byte[] readBuf=new byte[readLen]; while(true&&isLoop){ count=0; try{ do{ count=in.read(readBuf); if(count==-1){ isLoop=false; throw new IOException("The receive data length is not right"); } buf.write(readBuf,0,count); }while (count==readLen); log.info("Receive:"+new String(buf.toByteArray())); }catch(IOException ex){ log.error("The CCS "+socket.getRemoteSocketAddress()+" correspondence is unusual:"+ex); } //收到的结果是一个xml体,然后解析出sessionid和通信的结果result SocketServer.resultMap.put(sessionid,result);//保存返回结果 } }}
C部分:
- Java code
public class Communicate { public synchronized String conferenceComm(String ip){ OutputStream out=null; try { out = socket.getOutputStream(); } catch (IOException e){ log.error("Connecting the CCS that ip is "+socket.getInetAddress()+" is exception:"+e); return null; } String sessionid="1412412341";//这个sessionid是随机产生的,可以保证每次产生的不一样. //先发送 String cmdXml="<iabi> <cmd id='3000' sessionid="+sessionid+"> <confid> 88888888 </confid> </cmd> </iabi>"; try { byte[] response =cmdXml.getBytes(); out.write(response); out.flush(); } catch (IOException e){ log.error("Sending command to CCS that ip is "+socket.getInetAddress()+" is exception:"+e); return null; } log.info("Send:"+cmdXml); String result=null; int i=0; while(i<5){//轮询 try { Thread.sleep(100); } catch (InterruptedException e){ log.error(e); } i++; result=SocketServer.resultMap.get(sessionid);//根据sessionid查找对应的通信结果. if(result!=null){ break; } } return result; } }若并发的创建会议,就有瓶颈了,因为多线程操作Map,性能方面很不好,还要在C部分轮循.本人想了很久,要满足这种业务,还要很好的性能,自己没有找到好办法.请各位高手帮忙,先谢谢了!
[解决办法]
使用mina,见http://mina.apache.org
一切都很轻松了
[解决办法]
我刚写了一个聊天室程序,除了没用非阻塞通讯,效率上还不错,只是代码太多。
就说一下思路吧,也许对你有帮助。
首先我传的是对象,如果你觉得这样效率低,那就不用往下看了,只是为了扩展性才这样做的。
先定义了一个基类包,主要属性有ip,到达时间,是否关闭
然后是一个字符串包(专门传送字符类数据,目前这个就够了),采用的是类似http协议的方式,分为包头(http中ContentType:...)和包体(<html...),当然存放使用面向对象的方式,用一个map存包头(这里我们就可以自己定义了)信息,一个string存内容。
其次,服务器端用的是线程池,每接收一个客户端都放线程池里。
服务器端存放客户端处理的线程池(假设每个线程名叫C),另外再开2个线程,1个监听客户端线程A,一个向客户端发送线程B,创建1个同步列表,A线程每得到一个客户端就创建C并把同步列表传进去,再把C仍入线程池中运行,C循环接收对应客户端socket信息,每接收一个就仍入同步队列直到接收到结束标志,这样不必等待,能直接接收下一个,B线程循环读取同步队列,一旦有数据就处理,并拥有C线程的列表(b创建时用构造函数放进去的),所以可以根据接收到的信息,发送通知所有客户端。
做的过程中碰到一些问题,现总结一下
ObjectInputStream的构造函数是阻塞的,后面代码不能运行,必须先创建ObjectOutputStream
ObjectOutputStream 和ObjectInputStream要一一对应,原因是ObjectInputStream的构造函数会先读取4个也不知道5个字节的信息,其实就是ObjectOutputStream 的写入的,这也是为什么会阻塞的原因。
本来同步用Piped流做的但有问题所以改成同步列表了和另一个解决了但不知原因的问题,希望高手能帮我看看
见此贴http://topic.csdn.net/u/20071116/21/b4a89f71-ce40-41d2-8c1a-3877f1f678cc.html
[解决办法]
mark一下,有时间再看。
[解决办法]
to hrtc
你的想法跟我一开始看到这个问题的想法差不多,一个收信线程专门负责收信,然后把收到的信息put到一个queue(队列)里,然后另一个送信线程专门从queue里取信息,然后把取出的信息送信。但是似乎跟LZ的想法有出入。在LZ的另一贴里,我就一直提出一个问题,到底LZ是什么时候送信的?从LZ的解释以及LZ的代码上来看,LZ的送信和收信是同步的,即LZ本意想在送完信以后,就一直等待返信结果,也就是LZ的C部分代码的后面,循环5次去取返信结果,取不到就返回null,估计调用处理把null当作timeout处理的。
to LZ
我还是一直有疑问,为什么送信以后非要等待收信处理完?从这样的处理试样上,我只能这么推测,客户端向服务器端发送一个创建会议请求,然后服务器端开始调用送信方法,并且等待送信的返信结果,收到返信结果后,服务器端再把这个结果返回给客户端的请求,这样,算一个客户端的创建会议请求结束。是这样吗?如果是这样的,那么你的瓶颈问题存在是没办法的,因为你这样本身就是要求送信和返信同步,也就是应答式的。也许我不理解你的试样吧,我觉得你这里没必要送信和返信同步,也就是送信结束后就立刻返回,即给客户端返回一个创建会议ok,然后采用LS的方法,收信线程(即你的B部分)把返信结果存到一个queue里,然后再创建一个送信线程,从queue中take取出返信结果,然后再通过socket给创建会议的客户端传输信息。这样一切的信息传送,都在socket的流中去完成。
[解决办法]
mark 帮顶