From do-not-reply at jboss.org Fri Jan 6 07:15:08 2012 Content-Type: multipart/mixed; boundary="===============9001660138032942260==" MIME-Version: 1.0 From: do-not-reply at jboss.org To: hornetq-commits at lists.jboss.org Subject: [hornetq-commits] JBoss hornetq SVN: r11985 - trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11. Date: Fri, 06 Jan 2012 07:15:08 -0500 Message-ID: <201201061215.q06CF8kP005473@svn01.web.mwc.hst.phx2.redhat.com> --===============9001660138032942260== Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: quoted-printable Author: borges Date: 2012-01-06 07:15:07 -0500 (Fri, 06 Jan 2012) New Revision: 11985 Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/Sto= mpFrameHandlerV11.java Log: Fix waitTime comparison (which used waitTime1 twice instead of using waitTi= me2). Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/= v11/StompFrameHandlerV11.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/St= ompFrameHandlerV11.java 2012-01-06 12:10:51 UTC (rev 11984) +++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/St= ompFrameHandlerV11.java 2012-01-06 12:15:07 UTC (rev 11985) @@ -28,15 +28,15 @@ import org.hornetq.core.protocol.stomp.VersionedStompFrameHandler; = /** - * = + * * @author Howard Gao */ public class StompFrameHandlerV11 extends VersionedStompFrameHandler imple= ments FrameEventListener { private static final Logger log =3D Logger.getLogger(StompFrameHandlerV= 11.class); - = + private static final char ESC_CHAR =3D '\\'; - = + private HeartBeater heartBeater; = public StompFrameHandlerV11(StompConnection connection) @@ -130,12 +130,12 @@ { throw new HornetQStompException("Incorrect heartbeat header " + h= eartBeatHeader); } - = + //client ping long minPingInterval =3D Long.valueOf(params[0]); //client receive ping long minAcceptInterval =3D Long.valueOf(params[1]); - = + if ((minPingInterval !=3D 0) || (minAcceptInterval !=3D 0)) { heartBeater =3D new HeartBeater(minPingInterval, minAcceptInterva= l); @@ -177,7 +177,7 @@ response =3D new HornetQStompException("Must specify the subscri= ption's id").getFrame(); return response; } - = + try { connection.unsubscribe(subscriptionID); @@ -193,7 +193,7 @@ public StompFrame onAck(StompFrame request) { StompFrame response =3D null; - = + String messageID =3D request.getHeader(Stomp.Headers.Ack.MESSAGE_ID); String txID =3D request.getHeader(Stomp.Headers.TRANSACTION); String subscriptionID =3D request.getHeader(Stomp.Headers.Ack.SUBSCR= IPTION); @@ -202,13 +202,13 @@ { log.warn("Transactional acknowledgement is not supported"); } - = + if (subscriptionID =3D=3D null) { response =3D new HornetQStompException("subscription header is re= quired").getFrame(); return response; } - = + try { connection.acknowledge(messageID, subscriptionID); @@ -247,7 +247,7 @@ //kick off the pinger startHeartBeat(); } - = + if (reply.needsDisconnect()) { connection.disconnect(); @@ -261,7 +261,7 @@ } } } - = + private void startHeartBeat() { if (heartBeater !=3D null) @@ -269,25 +269,25 @@ heartBeater.start(); } } - = + public StompFrame createPingFrame() throws UnsupportedEncodingException { StompFrame frame =3D new StompFrame(Stomp.Commands.STOMP); frame.setPing(true); return frame; } - = - //server heart beat = - //algorithm: = - //(a) server ping: if server hasn't sent any frame within serverPing = - //interval, send a ping. = + + //server heart beat + //algorithm: + //(a) server ping: if server hasn't sent any frame within serverPing + //interval, send a ping. //(b) accept ping: if server hasn't received any frame within // 2*serverAcceptPing, disconnect! private class HeartBeater extends Thread { final int MIN_SERVER_PING =3D 500; final int MIN_CLIENT_PING =3D 500; - = + long serverPing =3D 0; long serverAcceptPing =3D 0; volatile boolean shutdown =3D false; @@ -301,13 +301,13 @@ { serverAcceptPing =3D clientPing > MIN_CLIENT_PING ? clientPing= : MIN_CLIENT_PING; } - = + if (clientAcceptPing !=3D 0) { serverPing =3D clientAcceptPing > MIN_SERVER_PING ? clientAcce= ptPing : MIN_SERVER_PING; } } - = + public synchronized void shutdown() { shutdown =3D true; @@ -336,14 +336,14 @@ { log.error("Cannot create ping frame due to encoding problem.",= e1); } - = + synchronized (this) { while (!shutdown) { long dur1 =3D 0; long dur2 =3D 0; - = + if (serverPing !=3D 0) { dur1 =3D System.currentTimeMillis() - lastPingTime.get(); @@ -358,7 +358,7 @@ if (serverAcceptPing !=3D 0) { dur2 =3D System.currentTimeMillis() - lastAccepted.get(); - = + if (dur2 > (2 * serverAcceptPing)) { connection.disconnect(); @@ -366,25 +366,25 @@ break; } } - = + long waitTime1 =3D 0; long waitTime2 =3D 0; - = + if (serverPing > 0) { waitTime1 =3D serverPing - dur1; } - = + if (serverAcceptPing > 0) { waitTime2 =3D serverAcceptPing * 2 - dur2; } - = + long waitTime =3D 10l; - = - if ((waitTime1 > 0) && (waitTime1 > 0)) + + if ((waitTime1 > 0) && (waitTime2 > 0)) { - waitTime =3D waitTime1 < waitTime2 ? waitTime1 : waitTim= e2; + waitTime =3D Math.min(waitTime1, waitTime2); } else if (waitTime1 > 0) { @@ -394,7 +394,7 @@ { waitTime =3D waitTime2; } - = + try { this.wait(waitTime); @@ -426,7 +426,7 @@ { return new StompFrameV11(command); } - = + //all frame except CONNECT are decoded here. @Override public StompFrame decode(StompDecoder decoder, final HornetQBuffer buff= er) throws HornetQStompException @@ -441,11 +441,11 @@ buffer.readBytes(decoder.workingBuffer, decoder.data, readable); = decoder.data +=3D readable; - = + if (decoder.command =3D=3D null) { int offset =3D 0; - = + //check for ping while (decoder.workingBuffer[offset] =3D=3D StompDecoder.NEW_LINE) { @@ -466,7 +466,7 @@ return null; } } - = + if (decoder.data < 4) { // Need at least four bytes to identify the command @@ -527,7 +527,7 @@ decoder.command =3D StompDecoder.COMMAND_COMMIT; } /**** added by meddy, 27 april 2011, handle header parser f= or reply to websocket protocol ****/ - else if (decoder.workingBuffer[offset+7] =3D=3D StompDecode= r.E) = + else if (decoder.workingBuffer[offset+7] =3D=3D StompDecode= r.E) { if (!decoder.tryIncrement(offset + StompDecoder.COMMAND_= CONNECTED_LENGTH + 1)) { @@ -535,7 +535,7 @@ } = // CONNECTED - decoder.command =3D StompDecoder.COMMAND_CONNECTED; = = + decoder.command =3D StompDecoder.COMMAND_CONNECTED; } /**** end ****/ else @@ -680,8 +680,8 @@ // Now the headers = boolean isEscaping =3D false; - SimpleBytes holder =3D new SimpleBytes(1024); = - = + SimpleBytes holder =3D new SimpleBytes(1024); + outer: while (true) { byte b =3D decoder.workingBuffer[decoder.pos++]; @@ -724,7 +724,7 @@ { throw new HornetQStompException("Encoding excep= tion", e); } - = + holder.reset(); = decoder.inHeaderName =3D false; @@ -772,14 +772,14 @@ throw new HornetQStompException("Encoding exception."= , e); } holder.reset(); - = + decoder.headers.put(decoder.headerName, headerValue); = if (decoder.headerName.equals(StompDecoder.CONTENT_LENGT= H_HEADER_NAME)) { decoder.contentLength =3D Integer.parseInt(headerValu= e); } - = + if (decoder.headerName.equals(StompDecoder.CONTENT_TYPE_= HEADER_NAME)) { decoder.contentType =3D headerValue; @@ -800,7 +800,7 @@ decoder.whiteSpaceOnly =3D false; = decoder.headerValueWhitespace =3D false; - = + holder.append(b); } } @@ -814,7 +814,7 @@ } = // Now the body - = + byte[] content =3D null; = if (decoder.contentLength !=3D -1) @@ -830,7 +830,7 @@ System.arraycopy(decoder.workingBuffer, decoder.pos, content, = 0, decoder.contentLength); = decoder.pos +=3D decoder.contentLength; - = + //drain all the rest if (decoder.bodyStart =3D=3D -1) { @@ -867,7 +867,7 @@ } } } - = + if (content !=3D null) { if (decoder.data > decoder.pos) @@ -894,5 +894,5 @@ return null; } } - = + } --===============9001660138032942260==--