读书人

HornetQ集群播音解析

发布时间: 2012-08-22 09:50:35 作者: rapoo

HornetQ集群广播解析
HornetQ支持集群方式来支持扩展性,集群中的节点借助JGroup组件来进行节点间的通信。在UTP方式中,节点在间隔时间内不断的像广播地址中发送消息,而客户端的程序就可以通过监听这个广播地址,解析广播的内容就能够知道集群中有哪些主机。对于使用Java语言的开发者来说,HornetQ的JAVA客户端相关类屏蔽了这些底层代码,使得开发变得简单。然而对于其他语言,需要自己手动解析相关内容。
以Hornet2.1.2版本为准,HornetQ每个节点像广播地址广播的信息遵循以下格式:

节点名称的长度+节点名称+唯一标识长度+唯一标识+连接器个数+各个连接器信息

下面是解析的JAVA代码,包含的内部类主要负责控制读取。

客户端的调用只需要启动该类,然后获取hostInfoMap就能够获得集群中的主机和他们的连接信息以及备用连接信息。如果要客户端的主机信息能够实时更新,可以通过修改主机集合向客户端发消息的方式实现



/* * Discovery .java * 本类非线程安全 */package com.socket.multisocket;import java.io.IOException;import java.net.DatagramPacket;import java.net.InetAddress;import java.net.MulticastSocket;import java.net.UnknownHostException;import java.util.ArrayList;import java.util.HashMap;import java.util.HashSet;import java.util.List;import java.util.Map;import java.util.Set;import java.util.Map.Entry;public class Discovery extends Thread {  private static final String HOST = "host";  private static final String PORT = "port";  private static final long timeout = Integer.valueOf(System.getProperty("broadcast_host_timeout", "10000"));  private static final String spliter = ":";    //存储节点信息  private static  Map<String,List<Map<String,String>>> hostInfoMap = new HashMap<String,List<Map<String,String>>>();    //存储节点名称,主要是定时刷新  public final static Map<String, Long> connectors = new HashMap<String, Long>();  /**   * 当前读的位置   */  private static int currentIndex = 0;  public static int connectorPairsSize = 0;   private String broadcastAddress;  private int broadcastPort;  public Discovery(String broadcastAddress, int broadcastPort) {    super();    this.broadcastAddress = broadcastAddress;    this.broadcastPort = broadcastPort;  }      public static Map<String,List<Map<String,String>>> getHost(){     return  hostInfoMap;  }   /**   * 重置当前读指针   **    */  public void putConnector(String key, Long currentTime) {    connectors.put(key, currentTime);   }  public void validateConnectors() {    Set<Entry<String, Long>> set = new HashSet<Entry<String,Long>>(connectors.entrySet());    for (Entry<String, Long> entry : set) {      if (entry.getValue() + timeout < System.currentTimeMillis()) {        connectors.remove(entry.getKey());        hostInfoMap.remove(entry.getKey());      }    }  }  public void run() {    InetAddress group = null;    MulticastSocket server = null;    try {      group = InetAddress.getByName(broadcastAddress);      server = new MulticastSocket(broadcastPort);      server.joinGroup(group);      final byte[] data = new byte[65535];      DatagramPacket recv = new DatagramPacket(data, data.length);      for (;;) {        server.receive(recv);        String uniqueName = ParseRecieveData.parsebytes(recv            .getData());        ParseRecieveData.release();        putConnector(uniqueName, System            .currentTimeMillis());        validateConnectors();      }    } catch (UnknownHostException e) {      throw new RuntimeException("can not find this broadcast address:"+broadcastAddress);    } catch (IOException e) {      e.printStackTrace();    } finally {      if (server != null)        try {          server.leaveGroup(group);        } catch (IOException e) {        }    }  }  private static class ParseRecieveData {    private static final byte TYPE_BOOLEAN = 0;    private static final byte TYPE_INT = 1;    private static final byte TYPE_LONG = 2;    private static final byte TYPE_STRING = 3;    /**     * 解析报文     *      * @author Jombo     * @param bytes     */    public static String parsebytes(byte[] bytes) {      int nodeIdLength = getInt(bytes, currentIndex);      getString(nodeIdLength, bytes, currentIndex);      int uniqueIdLength = getInt(bytes, currentIndex);      String uniqueName = getString(uniqueIdLength, bytes, currentIndex);      connectorPairsSize = getInt(bytes, currentIndex);      List<Map<String,String>> connectorList = new ArrayList<Map<String,String>>();      // connectorPairsSize可能有多个      for(int i =0 ;i<connectorPairsSize;i++){       Map<String, String> connectorPair = parseConnectorPairInfo(bytes);       connectorList.add(connectorPair);     }            hostInfoMap.put(uniqueName, connectorList);      return uniqueName;    }    /**     * 解析连接器信息     *      * @author Jombo     * @param bytes     */    private static Map<String, String> parseConnectorPairInfo(byte[] bytes) {      Map<String, String>  ConnectorPairMap = parseConnectorInfo(bytes);      Map<String, String> mainConnectorMap = parseConnectorInfo(bytes);      byte existBackup =  getByte(bytes, currentIndex);      boolean existbackupConnector =  existBackup == 0 ? false : true;      Map<String, String> backConnectorMap = null;      if(existbackupConnector){      backConnectorMap = parseConnectorInfo(bytes);      }      //在这里我们只想看host+port的形式,有兴趣的朋友可以把全部信息放进去      //这是为了获取主备连接      String mainconnector = mainConnectorMap.get(HOST) + spliter + mainConnectorMap.get(PORT);      String backconnector = backConnectorMap ==null ? null:backConnectorMap.get(HOST) + spliter + backConnectorMap.get(PORT);      ConnectorPairMap.put(mainconnector, backconnector);      return ConnectorPairMap;    }private static Map<String, String> parseConnectorInfo(byte[] bytes) {Map<String, String> paramMap = new HashMap<String, String>();  int namelength = getInt(bytes, currentIndex);  getString(namelength, bytes, currentIndex);  int factorylength = getInt(bytes, currentIndex);  getString(factorylength, bytes, currentIndex);  int paramPairsSize = getInt(bytes, currentIndex);    for (int j = 0; j < paramPairsSize; j++) {    int keylength = getInt(bytes, currentIndex);    String key = getString(keylength, bytes, currentIndex);    byte valuetype = getByte(bytes, currentIndex);    String value = "";    if (valuetype == TYPE_STRING) {      int valuelength = getInt(bytes, currentIndex);      value = getString(valuelength, bytes, currentIndex);    } else if (valuetype == TYPE_BOOLEAN) {      byte booleanvalue = getByte(bytes, currentIndex);      value = booleanvalue == 0 ? "false" : "true";    } else if (valuetype == TYPE_INT) {      value = String.valueOf(getInt(bytes, currentIndex));    } else if (valuetype == TYPE_LONG) {      value = String.valueOf(getLong(bytes, currentIndex));    } else {      throw new RuntimeException("invalid type");    }    paramMap.put(key, value);  }return paramMap;}    /**     * 读取一个Int     *      * @author Jombo     * @param bytes     * @param index     *          begin index     * @return     */    public static int getInt(byte[] bytes, int index) {      int val = (bytes[index] & 0xff) << 24 | (bytes[index + 1] & 0xff) << 16          | (bytes[index + 2] & 0xff) << 8 | (bytes[index + 3] & 0xff) << 0;      setCurrentIndex(index + 4);      return val;    }    /**     * 读取一个Byte     *      * @author Jombo     * @param bytes     * @param index     * @return     */    public static byte getByte(byte[] bytes, int index) {      byte b = bytes[index];      setCurrentIndex(index + 1);      return b;    }    /**     * 读取Short     *      * @author Jombo     * @param bytes     * @param index     * @return     */    public static short getShort(byte[] bytes, int index) {      short indexshort = (short) (bytes[index] << 8 | bytes[index + 1] & 0xFF);      setCurrentIndex(index + 2);      return indexshort;    }    /**     * 读取字符串,根据字符串的长度采取不同的格式     *      * @author Jombo     * @param length     * @param bytes     * @param index     * @return     */    public static String getString(int length, byte[] bytes, int index) {      StringBuffer stringBuffer = new StringBuffer();      if (length < 9) {        for (int i = 0; i < length; i++) {          short indexshort = getShort(bytes, index);          stringBuffer.append((char) indexshort);          index = index + 2;        }      } else if (length < 0xfff) {        short utflen = getShort(bytes, index);        index = index + 2;        byte[] strbyte = new byte[utflen];        System.arraycopy(bytes, index, strbyte, 0, utflen);        stringBuffer.append(new String(strbyte));        setCurrentIndex(index + utflen);      } else {        int longlength = getInt(bytes, index);        index = index + 4;        byte[] strbyte = new byte[longlength];        System.arraycopy(bytes, index, strbyte, 0, longlength);        stringBuffer.append(new String(strbyte));        setCurrentIndex(index + longlength);      }      return stringBuffer.toString();    }    /**     * 读取Long     *      * @author Jombo     * @param bytes     * @param index     * @return     */    public static long getLong(byte[] bytes, int index) {      setCurrentIndex(index + 8);      return ((long) bytes[index] & 0xff) << 56          | ((long) bytes[index + 1] & 0xff) << 48          | ((long) bytes[index + 2] & 0xff) << 40          | ((long) bytes[index + 3] & 0xff) << 32          | ((long) bytes[index + 4] & 0xff) << 24          | ((long) bytes[index + 5] & 0xff) << 16          | ((long) bytes[index + 6] & 0xff) << 8          | ((long) bytes[index + 7] & 0xff) << 0;    }    /**     * 重置当前读指针     **      */    public static void release() {      currentIndex = 0;    }    public static void setCurrentIndex(int index) {      currentIndex = index;    }  }}

读书人网 >软件架构设计

热点推荐