|
该帖已经被评为精华帖
|
|
|---|---|
| 作者 | 正文 |
|
时间:2004-12-02
开发一套稳定高效客户端-服务端Socket通讯服务不是一件很简单的事。
这几天试用了一下Netty2(http://gleamynode.net/dev/projects/netty2/) 感觉很不错,希望能与大家分享。 我们在做短信应用开发中,经常会碰到与移动、联通、小灵通网关进行Socket通讯,厂商提供的API效率不高,而自己开发的API费时、费力,花费了不少时间,稳定性还经常无法保证。 使用了Netty2,一方面能使开发框架更加清晰,也减少了不少开发时间。下面简单举个开发与小灵通短信网关通讯API的例子。 小灵通短信协议于SMPP、CMPP类似: 有login,submit,deliver,exit等数据包 每个数据包都由包头和包体组成。 其中包头有3个字段totalLength,commandId,sequenceId,每个字段占4个byte,共12个byte。 而包体根据协议都有不同的定义,详细请参考小灵通通讯协议。 下面举例如何使用Netty实现此通讯协议: 声明:JavaEye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
|
|
| 返回顶楼 | |
|
时间:2004-12-02
1、实现Netty的MessageRecognizer 接口
[code:1] package net.smsfan.smg.api.phs20; import java.nio.*; import net.gleamynode.netty2.*; public class PHSMessageRecognizer implements MessageRecognizer { public PHSMessageRecognizer() { } public Message recognize(ByteBuffer buf) throws MessageParseException { // return null if message type is not arrived yet. if (buf.remaining() < PHSMessage.lenMessageHeader) return null; int totalLength = buf.getInt(); int commandId = buf.getInt(); int sequenceId = buf.getInt(); switch (commandId) { case PHSLoginRep.COMMAND_ID: return new PHSLoginRep(totalLength,commandId,sequenceId); default: throw new MessageParseException("unknown type: " + commandId); } } } [/code:1] 由于精力原因,只在这实现Login,LoginRep数据包 |
|
| 返回顶楼 | |
|
时间:2004-12-02
2、实现通讯的协议包(PHSLogin,PHSLoginRep),需要继承Netty的Message接口,在这里由于协议包中有很多地方可以被抽象,所以还定义了一个PHSMessage的抽象类(在被注释的代码中,还残留了一些未用Netty之前自己用jdk io类时残留的一些代码
[code:1] package net.smsfan.smg.api.phs20; import net.gleamynode.netty2.Message; import java.nio.ByteBuffer; import net.gleamynode.netty2.MessageParseException; public abstract class PHSMessage implements Message{ // Max Content Length public static final int LEN_MAX_CONTENT_ASCII = 160; public static final int LEN_MAX_CONTENT = 140; //Max DestUsr_Tl public static final int MAX_DESTUSR_TL = 99; public static final int MAX_MSG_LEVEL = 9; public static final int MIN_MSG_LEVEL = 0; public static final int MSG_TYPE_ASCII = 0; public static final int MSG_TYPE_WRITECARD = 3; public static final int MSG_TYPE_BINARY = 4; public static final int MSG_TYPE_UCS2 = 8; public static final int MSG_TYPE_CHINESE = 15; public static final int DELIVER_TYPE_COMMAN_DELIVER = 0; public static final int DELIVER_TYPE_REPORT_DELIVER = 1; /* 消息头定义 */ public static final int lenMessageHeader = 12; private int totalLength; private int commandId; private int sequenceId; public PHSMessage(int h, int i, int j) { totalLength = h; commandId = i; sequenceId = j; } public PHSMessage(int i) { totalLength = 0; commandId = i; PHSSeq seq = new PHSSeq(); sequenceId = seq.getSeq(); } // public PHSMessage(PHSIO cmppio) throws PHSException { // super.getPacket(cmppio, lenMessageHeader); // // totalLength = getInteger(); // commandId = getInteger(); // sequenceId = getInteger(); // int i = totalLength - lenMessageHeader; // if (i > 0) // super.getPacket(cmppio, i); // } // // protected void encodePacket(PHSIO cmppio) throws PHSException { // totalLength = getLength() + lenMessageHeader; // insertInteger(sequenceId); // insertInteger(commandId); // insertInteger(totalLength); // super.encodePacket(cmppio); // } public int getCommandId() { return commandId; } public void setCommandId(int commandId) { this.commandId = commandId; } public int getTotalLength() { return totalLength; } public int getSequenceId() { return sequenceId; } public void setSequenceId(int i) { sequenceId = i; } public void setTotalLength(int totalLength) { this.totalLength = totalLength; } protected abstract void readBody(ByteBuffer buf) throws MessageParseException; protected abstract void writeBody(ByteBuffer buf); protected abstract void setTotalLength(); private void readHeader(ByteBuffer buf) throws MessageParseException { totalLength = buf.getInt(); commandId = buf.getInt(); sequenceId = buf.getInt(); } public boolean read(ByteBuffer buf) throws MessageParseException { if (buf.remaining() < totalLength) return false; readHeader(buf); int i = totalLength - lenMessageHeader; if (i > 0){ readBody(buf); } return true; } public boolean write(ByteBuffer buf) { if (buf.remaining() < totalLength) return false; setTotalLength(); writeHeader(buf); writeBody(buf); return true; } private void writeHeader(ByteBuffer buf) { buf.putInt(totalLength); buf.putInt(commandId); buf.putInt(sequenceId); } protected byte[] getBytes(String str0, int LenStr0){ if (str0 == null) throw new IllegalArgumentException("insertString : null String !"); if (LenStr0 < 0) throw new IllegalArgumentException("insertStrings : LenStr0 can't < 0 !"); byte abyte1[] = new byte[LenStr0]; for (int i = 0; i < abyte1.length; i++) { abyte1[i] = 0; } byte strbyte[] = str0.getBytes(); int m = str0.length() > LenStr0 ? LenStr0 : str0.length(); for (int j = 0; j < m; j++) { abyte1[j] = strbyte[j]; } return abyte1; } } [/code:1] [code:1] package net.smsfan.smg.api.phs20; import java.nio.ByteBuffer; import net.gleamynode.netty2.MessageParseException; public class PHSLogin extends PHSMessage { static final int COMMAND_ID = 1; static final int lenSourceAddr = 8; static final int lenAuthenticatorSP = 16; static final int lenLoginMode = 1; static final int lenVersion = 1; static final int lenTimestamp = 4; static final int lenMessageBody = lenSourceAddr + lenAuthenticatorSP + lenLoginMode + lenTimestamp + lenVersion; String sourceAddr; String authenticatorSP; byte loginMode = 0; int timestamp = 0; byte version = 0; public PHSLogin(String SourceAddr, String AuthenticatorSP, byte loginMode, byte Version) throws PHSException { super(COMMAND_ID); setClientId(SourceAddr); setAuthenticatorSP(AuthenticatorSP); setVersion(Version); setLoginMode(loginMode); setTimestamp(); } public void setClientId(String SourceAddr) throws PHSException { if (SourceAddr == null) throw new PHSException("setSource_Addr : Source_Addr is null !"); else { this.sourceAddr = SourceAddr; return; } } public void setAuthenticatorSP(String AuthenticatorSP) throws PHSException { if (AuthenticatorSP == null) throw new PHSException( "setAuthenticatorSP : AuthenticatorSP is null !"); else { this.authenticatorSP = AuthenticatorSP; return; } } public void setLoginMode(byte loginMode) throws PHSException { this.loginMode = loginMode; return; } public void setVersion(byte Version) throws PHSException { this.version = Version; return; } private void setTimestamp() throws PHSException { timestamp = PHSUtil.getTimestamp(); return; } public String getSourceAddr() throws PHSException { return sourceAddr; } public String getAuthenticatorSP() throws PHSException { return authenticatorSP; } public byte getVersion() throws PHSException { return version; } public int getTimestamp() throws PHSException { return timestamp; } protected void readBody(ByteBuffer buf) throws MessageParseException { throw new UnsupportedOperationException(); } protected void writeBody(ByteBuffer buf) { // if (sourceAddr == null) // throw new IllegalArgumentException("encodePacket : Source_Addr is null !"); // if (authenticatorSP == null) // throw new IllegalArgumentException("encodePacket : AuthenticatorSP is null !"); byte[] spaceString = {0x00,0x00,0x00,0x00,0x00,0x00,0x00}; byte[] md5authenticatorSP = PHSUtil.hash(sourceAddr+new String(spaceString)+ authenticatorSP + PHSUtil.zeroPadString(String.valueOf(timestamp),10)); buf.put(getBytes(sourceAddr,this.lenSourceAddr)); buf.put(md5authenticatorSP); buf.put(loginMode); buf.putInt(timestamp); buf.put(version); } protected void setTotalLength() { setTotalLength(lenMessageBody+lenMessageHeader); } } [/code:1] [code:1] package net.smsfan.smg.api.phs20; import java.nio.ByteBuffer; import net.gleamynode.netty2.MessageParseException; public class PHSLoginRep extends PHSMessage { public static final int COMMAND_ID = 0x80000001; private static final int lenStatus = 4; private static final int lenAuthenticatorISMG = 16; private static final int lenVersion = 1; private static final int lenMessageBody = lenStatus + lenAuthenticatorISMG + lenVersion; private int status; private String authenticatorISMG; private byte version; public int getStatus() { return status; } public String getAuthenticatorISMG() { return authenticatorISMG; } public byte getVersion() { return version; } public PHSLoginRep(int h, int i, int j) { super(h, i, j); } protected void readBody(ByteBuffer buf) throws MessageParseException { status = buf.getInt(); byte[] auth = new byte[lenAuthenticatorISMG]; buf.get(auth); authenticatorISMG = new String(auth); version = buf.get(); } protected void writeBody(ByteBuffer buf) { } protected void setTotalLength() { throw new UnsupportedOperationException(); } } [/code:1] |
|
| 返回顶楼 | |
|
时间:2004-12-02
OK,完成这三个类,对于Socket流数据与Message对象的转换都已完成。以后就是根据协议增加相应的Message类即可。
下面看看如何运行让它工作起来,实现Netty的SessionListener 接口,网络连接成功、信息接收、网络断开事件都可知晓: [code:1] package net.smsfan.smg.api.phs20; import java.io.*; import java.net.*; import java.util.*; import net.gleamynode.netty2.*; public class PHSClient implements SessionListener { private static final int CONNECT_TIMEOUT = 30; // seconds private static final int DISPATCHER_THREAD_POOL_SIZE = 10; private Hashtable lockMap = new Hashtable(); // Packet.seq --> Object private Hashtable waitPackets = new Hashtable(); private LinkedList listeners = new LinkedList(); //数据包接收者 private boolean debug; private int status = 0; // 0 -- unlogin ; 1 -- login private IoProcessor ioProcessor; private ThreadPooledEventDispatcher eventDispatcher; private Session session; public PHSClient(String host, int port) throws IOException { debug = true; PHSClientInit(host, port); } public void PHSClientInit(String host, int port) throws IOException { // initialize I/O processor and event dispatcher ioProcessor = new IoProcessor(); eventDispatcher = new OrderedEventDispatcher(); // start with the default number of I/O worker threads ioProcessor.start(); // start with a few event dispatcher threads eventDispatcher.setThreadPoolSize(DISPATCHER_THREAD_POOL_SIZE); eventDispatcher.start(); // prepare message recognizer MessageRecognizer recognizer = new PHSMessageRecognizer(); // create a client session session = new Session(ioProcessor, new InetSocketAddress( host, port), recognizer, eventDispatcher); // set configuration session.getConfig().setConnectTimeout(CONNECT_TIMEOUT); // suscribe and start communication session.addSessionListener(this); log("Connecting to " + session.getSocketAddress(), debug); session.start(); } public PHSLoginRep login(PHSLogin login, long waitTime) throws PHSException { PHSLoginRep rep = (PHSLoginRep) putRequestForRep(login, waitTime); if (rep == null)return null; if (rep.getStatus() == 0) { this.status = 1; } return rep; } public void close() { try { lockMap.clear(); waitPackets.clear(); listeners.clear(); } catch (Exception ex1) { } // stop I/O processor and event dispatcher eventDispatcher.stop(); ioProcessor.stop(); } private PHSMessage putRequestForRep(PHSMessage msg, long waitTime) throws PHSException { if (msg == null || waitTime < 0)return null; Object lock = new Object(); lockMap.put(new Integer(msg.getSequenceId()), lock); putRequest(msg); PHSMessage resp = removeWaitPacket(new Integer(msg.getSequenceId())); if (resp == null) { synchronized (lock) { try { lock.wait(waitTime); } catch (InterruptedException ex) { } } resp = removeWaitPacket(new Integer(msg.getSequenceId())); } lockMap.remove(new Integer(msg.getSequenceId())); return resp; } public void addWaitPacket(PHSMessage packet) { waitPackets.put(new Integer(packet.getSequenceId()), packet); } public PHSMessage removeWaitPacket(Integer seq) { return (PHSMessage) waitPackets.remove(seq); } private void putRequest(PHSMessage msg) throws PHSException { session.write(msg); } protected void finalize() { close(); } public boolean addListener(PHSClientListener listener) { return listeners.add(listener); } public boolean removeListener(PHSClientListener listener) { return listeners.remove(listener); } private void log(String str, boolean debug) { if (debug) System.out.println(str); } private void fireConnected() { final Object[] alisteners = listeners.toArray(); final int size = alisteners.length; for (int i = 0; i < size; i++) { try { ( (PHSClientListener) alisteners[i]).onClientConnected(); } catch (Exception ex1) { } } } private void fireDisconnected() { final Object[] alisteners = listeners.toArray(); final int size = alisteners.length; for (int i = 0; i < size; i++) { try { ( (PHSClientListener) alisteners[i]).onClientDisconnected(); } catch (Exception ex1) { } } } public boolean isConnected() { return session.isConnected(); } // 实现 SessionListener 接口 public void connectionEstablished(Session session) { fireConnected(); } public void connectionClosed(Session session) { fireDisconnected(); } public void messageReceived(Session session, Message message) { PHSMessage msg = (PHSMessage) message; if ( (msg.getCommandId() & 0x80000000) != 0) { if (msg != null && lockMap.containsKey(new Integer(msg.getSequenceId()))) { addWaitPacket(msg); Object lock = lockMap.remove(new Integer(msg.getSequenceId())); synchronized (lock) { lock.notify(); } return; } } switch (msg.getCommandId()) { default: { log("unknow Message=" + msg.getCommandId(), debug); } } } public void messageSent(Session session, Message message) { } public void sessionIdle(Session session) { } public void exceptionCaught(Session session, Throwable throwable) { throwable.printStackTrace(); } } [/code:1] |
|
| 返回顶楼 | |
|
时间:2004-12-02
最后。写个Example类去运行:
[code:1] package net.smsfan.smg.api.phs20.example; import java.io.*; import net.smsfan.smg.api.phs20.*; import java.util.Properties; public class TestClient implements PHSClientListener { private String serverHost = "218.66.104.104"; private int serverPort = 8890; private String name = ""; private String password = ""; private PHSClient client; public TestClient() throws IOException, PHSException { loadProperties(); makeConnection(); client.addListener(this); } /** * loadProperties */ private void loadProperties() { Properties props = new Properties(); try { FileInputStream fileinputstream = new FileInputStream("smg.properties"); props = new Properties(); props.load(fileinputstream); fileinputstream.close(); } catch (Exception e) { System.out.println( "Unable to read configuration file,Please set smg.properties in work dir!"); System.exit(0); } serverHost = props.getProperty("serverHost"); serverPort = Integer.parseInt(props.getProperty("serverPort")); name = props.getProperty("name"); password = props.getProperty("password"); } public void connect() throws Exception { String strSource_Addr = name; String strAuthenticatorSP = password; byte version = 0; byte loginMode = 2; PHSLogin login = new PHSLogin(strSource_Addr, strAuthenticatorSP, loginMode, version); PHSLoginRep rep = client.login(login, 10000); if (rep == null) { System.out.println("No response packet !"); return; } if (rep.getCommandId() != 0x80000001) System.out.println("Invalid command id !"); if (rep.getSequenceId() != login.getSequenceId()) System.out.println("invalid sequence id !"); System.out.println("Status = " + rep.getStatus()); System.out.println("AuthenticatorISMG = " + rep.getAuthenticatorISMG()); System.out.println("Version = " + rep.getVersion()); } public static void main(String args[]) { InputStreamReader CMPPCommand = new InputStreamReader(System.in); BufferedReader buffCMPPCommand = new BufferedReader(CMPPCommand); try { TestClient cmppclient = new TestClient(); String s; while ( (s = buffCMPPCommand.readLine()) != null) { if (s.trim().equals("connect")) { System.out.println( "Attempting to Send PHS_Login Command to Server ......"); cmppclient.connect(); continue; } if (s.trim().equals("quit")) { System.out.println("CMPP Client quit ."); System.exit(1); } System.out.println("unrecognised command !"); } } catch (Exception exception) { exception.printStackTrace(); } } private synchronized void makeConnection() { if (client != null) { if (client.isConnected())return; client.close(); client = null; } boolean isConnected = false; try { while (!isConnected) { try { client = new PHSClient(serverHost, serverPort); isConnected = true; } catch (Exception ex1) { ex1.printStackTrace(); } } client.addListener(this); boolean isSentConnected = false; while (!isSentConnected) { if (client.isConnected()) { connect(); isSentConnected = true; } } } catch (Exception ex) { ex.printStackTrace(); } } public void onClientDisconnected() { System.out.println("Client Disconnected!!!Reconnect ... "); makeConnection(); } public void onClientConnected() { } } [/code:1] |
|
| 返回顶楼 | |
|
时间:2004-12-02
下一步,考虑如何将它 与 Spring 结合起来,成为Spring的一个组件。相信这做起来一定很简单
|
|
| 返回顶楼 | |
|
时间:2004-12-03
Netty2是一个不错的NIO框架。另外如果有兴趣的话,你可以试一下http://sourceforge.net/projects/cindy/,支持SocketChannel/ServerSocketChannel/DatagramChannel/Pipe,还模拟了MulticastChannel,使得应用可以用同一个模型访问TCP/UDP。
|
|
| 返回顶楼 | |
|
时间:2004-12-04
THX,我去试试看看,主要目的是想找到一个开发IM软件服务端的网络通讯框架。
|
|
| 返回顶楼 | |
|
时间:2004-12-04
Cindy 怎么这么象 Netty?
|
|
| 返回顶楼 | |
|
时间:2004-12-04
Cindy最主要的构想是读完Java NIO这本书中的一个例子产生的,随后查找了当时所能够找到的Opensource的NIO实现,Netty2也是其中之一,所以Cindy中MessageRecognizer这个类其实是从Netty2中学习到的。
但是我对NIO的理解和Netty2有所不一样,要不就加入它的开发组了:)从Cindy 1.0版本可以看出,其实Cindy 1.0支持多种基于NIO的模型。但经过几个实际项目的考验,我发觉一些模型在实际使用中并没有什么优势,所以经过精简后保留了最常用的,并加入了UDP和Pipe的支持。 Cindy是从http://sourceforge.net/projects/java-jml中抽取出来的,最早它只是用来做这个MSN类库的基础平台,因为我的项目中单独使用这个NIO框架比较多,就把它抽取到Cindy这个项目中来了。 |
|
| 返回顶楼 | |



