Author: borges
Date: 2012-01-06 07:15:07 -0500 (Fri, 06 Jan 2012)
New Revision: 11985
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
Log:
Fix waitTime comparison (which used waitTime1 twice instead of using waitTime2).
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2012-01-06
12:10:51 UTC (rev 11984)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2012-01-06
12:15:07 UTC (rev 11985)
@@ -28,15 +28,15 @@
import org.hornetq.core.protocol.stomp.VersionedStompFrameHandler;
/**
- *
+ *
* @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
*/
public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
FrameEventListener
{
private static final Logger log = Logger.getLogger(StompFrameHandlerV11.class);
-
+
private static final char ESC_CHAR = '\\';
-
+
private HeartBeater heartBeater;
public StompFrameHandlerV11(StompConnection connection)
@@ -130,12 +130,12 @@
{
throw new HornetQStompException("Incorrect heartbeat header " +
heartBeatHeader);
}
-
+
//client ping
long minPingInterval = Long.valueOf(params[0]);
//client receive ping
long minAcceptInterval = Long.valueOf(params[1]);
-
+
if ((minPingInterval != 0) || (minAcceptInterval != 0))
{
heartBeater = new HeartBeater(minPingInterval, minAcceptInterval);
@@ -177,7 +177,7 @@
response = new HornetQStompException("Must specify the subscription's
id").getFrame();
return response;
}
-
+
try
{
connection.unsubscribe(subscriptionID);
@@ -193,7 +193,7 @@
public StompFrame onAck(StompFrame request)
{
StompFrame response = null;
-
+
String messageID = request.getHeader(Stomp.Headers.Ack.MESSAGE_ID);
String txID = request.getHeader(Stomp.Headers.TRANSACTION);
String subscriptionID = request.getHeader(Stomp.Headers.Ack.SUBSCRIPTION);
@@ -202,13 +202,13 @@
{
log.warn("Transactional acknowledgement is not supported");
}
-
+
if (subscriptionID == null)
{
response = new HornetQStompException("subscription header is
required").getFrame();
return response;
}
-
+
try
{
connection.acknowledge(messageID, subscriptionID);
@@ -247,7 +247,7 @@
//kick off the pinger
startHeartBeat();
}
-
+
if (reply.needsDisconnect())
{
connection.disconnect();
@@ -261,7 +261,7 @@
}
}
}
-
+
private void startHeartBeat()
{
if (heartBeater != null)
@@ -269,25 +269,25 @@
heartBeater.start();
}
}
-
+
public StompFrame createPingFrame() throws UnsupportedEncodingException
{
StompFrame frame = new StompFrame(Stomp.Commands.STOMP);
frame.setPing(true);
return frame;
}
-
- //server heart beat
- //algorithm:
- //(a) server ping: if server hasn't sent any frame within serverPing
- //interval, send a ping.
+
+ //server heart beat
+ //algorithm:
+ //(a) server ping: if server hasn't sent any frame within serverPing
+ //interval, send a ping.
//(b) accept ping: if server hasn't received any frame within
// 2*serverAcceptPing, disconnect!
private class HeartBeater extends Thread
{
final int MIN_SERVER_PING = 500;
final int MIN_CLIENT_PING = 500;
-
+
long serverPing = 0;
long serverAcceptPing = 0;
volatile boolean shutdown = false;
@@ -301,13 +301,13 @@
{
serverAcceptPing = clientPing > MIN_CLIENT_PING ? clientPing :
MIN_CLIENT_PING;
}
-
+
if (clientAcceptPing != 0)
{
serverPing = clientAcceptPing > MIN_SERVER_PING ? clientAcceptPing :
MIN_SERVER_PING;
}
}
-
+
public synchronized void shutdown()
{
shutdown = true;
@@ -336,14 +336,14 @@
{
log.error("Cannot create ping frame due to encoding problem.",
e1);
}
-
+
synchronized (this)
{
while (!shutdown)
{
long dur1 = 0;
long dur2 = 0;
-
+
if (serverPing != 0)
{
dur1 = System.currentTimeMillis() - lastPingTime.get();
@@ -358,7 +358,7 @@
if (serverAcceptPing != 0)
{
dur2 = System.currentTimeMillis() - lastAccepted.get();
-
+
if (dur2 > (2 * serverAcceptPing))
{
connection.disconnect();
@@ -366,25 +366,25 @@
break;
}
}
-
+
long waitTime1 = 0;
long waitTime2 = 0;
-
+
if (serverPing > 0)
{
waitTime1 = serverPing - dur1;
}
-
+
if (serverAcceptPing > 0)
{
waitTime2 = serverAcceptPing * 2 - dur2;
}
-
+
long waitTime = 10l;
-
- if ((waitTime1 > 0) && (waitTime1 > 0))
+
+ if ((waitTime1 > 0) && (waitTime2 > 0))
{
- waitTime = waitTime1 < waitTime2 ? waitTime1 : waitTime2;
+ waitTime = Math.min(waitTime1, waitTime2);
}
else if (waitTime1 > 0)
{
@@ -394,7 +394,7 @@
{
waitTime = waitTime2;
}
-
+
try
{
this.wait(waitTime);
@@ -426,7 +426,7 @@
{
return new StompFrameV11(command);
}
-
+
//all frame except CONNECT are decoded here.
@Override
public StompFrame decode(StompDecoder decoder, final HornetQBuffer buffer) throws
HornetQStompException
@@ -441,11 +441,11 @@
buffer.readBytes(decoder.workingBuffer, decoder.data, readable);
decoder.data += readable;
-
+
if (decoder.command == null)
{
int offset = 0;
-
+
//check for ping
while (decoder.workingBuffer[offset] == StompDecoder.NEW_LINE)
{
@@ -466,7 +466,7 @@
return null;
}
}
-
+
if (decoder.data < 4)
{
// Need at least four bytes to identify the command
@@ -527,7 +527,7 @@
decoder.command = StompDecoder.COMMAND_COMMIT;
}
/**** added by meddy, 27 april 2011, handle header parser for reply to
websocket protocol ****/
- else if (decoder.workingBuffer[offset+7] == StompDecoder.E)
+ else if (decoder.workingBuffer[offset+7] == StompDecoder.E)
{
if (!decoder.tryIncrement(offset +
StompDecoder.COMMAND_CONNECTED_LENGTH + 1))
{
@@ -535,7 +535,7 @@
}
// CONNECTED
- decoder.command = StompDecoder.COMMAND_CONNECTED;
+ decoder.command = StompDecoder.COMMAND_CONNECTED;
}
/**** end ****/
else
@@ -680,8 +680,8 @@
// Now the headers
boolean isEscaping = false;
- SimpleBytes holder = new SimpleBytes(1024);
-
+ SimpleBytes holder = new SimpleBytes(1024);
+
outer: while (true)
{
byte b = decoder.workingBuffer[decoder.pos++];
@@ -724,7 +724,7 @@
{
throw new HornetQStompException("Encoding
exception", e);
}
-
+
holder.reset();
decoder.inHeaderName = false;
@@ -772,14 +772,14 @@
throw new HornetQStompException("Encoding exception.",
e);
}
holder.reset();
-
+
decoder.headers.put(decoder.headerName, headerValue);
if
(decoder.headerName.equals(StompDecoder.CONTENT_LENGTH_HEADER_NAME))
{
decoder.contentLength = Integer.parseInt(headerValue);
}
-
+
if (decoder.headerName.equals(StompDecoder.CONTENT_TYPE_HEADER_NAME))
{
decoder.contentType = headerValue;
@@ -800,7 +800,7 @@
decoder.whiteSpaceOnly = false;
decoder.headerValueWhitespace = false;
-
+
holder.append(b);
}
}
@@ -814,7 +814,7 @@
}
// Now the body
-
+
byte[] content = null;
if (decoder.contentLength != -1)
@@ -830,7 +830,7 @@
System.arraycopy(decoder.workingBuffer, decoder.pos, content, 0,
decoder.contentLength);
decoder.pos += decoder.contentLength;
-
+
//drain all the rest
if (decoder.bodyStart == -1)
{
@@ -867,7 +867,7 @@
}
}
}
-
+
if (content != null)
{
if (decoder.data > decoder.pos)
@@ -894,5 +894,5 @@
return null;
}
}
-
+
}