Author: gaohoward
Date: 2011-11-01 23:05:57 -0400 (Tue, 01 Nov 2011)
New Revision: 11629
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrame.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrameV10.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrameV11.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
Log:
Fix Stomp heart-beat issue:
It should send a 'new line' byte rather than a STOMP frame, as per spec
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java 2011-11-02
01:16:08 UTC (rev 11628)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java 2011-11-02
03:05:57 UTC (rev 11629)
@@ -58,6 +58,8 @@
protected boolean disconnect;
+ protected boolean isPing;
+
public StompFrame(String command)
{
this(command, false);
@@ -109,6 +111,16 @@
out += body;
return out;
}
+
+ public boolean isPing()
+ {
+ return isPing;
+ }
+
+ public void setPing(boolean ping)
+ {
+ isPing = ping;
+ }
public HornetQBuffer toHornetQBuffer() throws Exception
{
@@ -123,6 +135,12 @@
buffer = HornetQBuffers.dynamicBuffer(512);
}
+ if (isPing())
+ {
+ buffer.writeByte((byte)10);
+ return buffer;
+ }
+
StringBuffer head = new StringBuffer();
head.append(command);
head.append(Stomp.NEWLINE);
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-11-02
01:16:08 UTC (rev 11628)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-11-02
03:05:57 UTC (rev 11629)
@@ -126,6 +126,7 @@
{
response = new HornetQStompException("Encoding error.",
e).getFrame();
}
+
return response;
}
@@ -193,7 +194,7 @@
@Override
public StompFrame onSend(StompFrame frame)
- {
+ {
StompFrame response = null;
try
{
@@ -492,7 +493,7 @@
public StompFrame createPingFrame() throws UnsupportedEncodingException
{
StompFrame frame = new StompFrame(Stomp.Commands.STOMP);
- frame.setBody("\n");
+ frame.setPing(true);
return frame;
}
@@ -658,31 +659,37 @@
buffer.readBytes(decoder.workingBuffer, decoder.data, readable);
decoder.data += readable;
-
+
if (decoder.command == null)
{
- if (decoder.data < 4)
+ int offset = 0;
+
+ //check for ping
+ while (decoder.workingBuffer[offset] == StompDecoder.NEW_LINE)
{
- // Need at least four bytes to identify the command
- // - up to 3 bytes for the command name + potentially another byte for a
leading \n
-
- return null;
- }
-
- int offset;
-
- if (decoder.workingBuffer[0] == StompDecoder.NEW_LINE)
- {
+ if (heartBeater != null)
+ {
+ //client ping
+ heartBeater.pingAccepted();
+ }
// Yuck, some badly behaved STOMP clients add a \n *after* the terminating
NUL char at the end of the
// STOMP
// frame this can manifest as an extra \n at the beginning when the next
STOMP frame is read - we need to
// deal
// with this
- offset = 1;
+ offset++;
+ if (offset >= decoder.data)
+ {
+ decoder.data = 0;
+ return null;
+ }
}
- else
+
+ if (decoder.data < 4)
{
- offset = 0;
+ // Need at least four bytes to identify the command
+ // - up to 3 bytes for the command name + potentially another byte for a
leading \n
+ return null;
}
byte b = decoder.workingBuffer[offset];
@@ -1025,7 +1032,7 @@
}
// Now the body
-
+
byte[] content = null;
if (decoder.contentLength != -1)
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java 2011-11-02
01:16:08 UTC (rev 11628)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java 2011-11-02
03:05:57 UTC (rev 11629)
@@ -25,7 +25,7 @@
* @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
*
*/
-public class AbstractClientStompFrame implements ClientStompFrame
+public abstract class AbstractClientStompFrame implements ClientStompFrame
{
protected static final String HEADER_RECEIPT = "receipt";
@@ -56,6 +56,13 @@
@Override
public ByteBuffer toByteBuffer() throws UnsupportedEncodingException
{
+ if (isPing())
+ {
+ ByteBuffer buffer = ByteBuffer.allocateDirect(1);
+ buffer.put((byte)0x0A);
+ buffer.rewind();
+ return buffer;
+ }
StringBuffer sb = new StringBuffer();
sb.append(command + "\n");
int n = headers.size();
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java 2011-11-02
01:16:08 UTC (rev 11628)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java 2011-11-02
03:05:57 UTC (rev 11629)
@@ -177,8 +177,8 @@
if (validateFrame(frame))
{
- frameQueue.offer(frame);
- receiveList.clear();
+ frameQueue.offer(frame);
+ receiveList.clear();
}
else
{
@@ -188,13 +188,25 @@
}
else
{
- receiveList.add(b);
+ if (b == 10 && receiveList.size() == 0)
+ {
+ //may be a ping
+ incrementServerPing();
+ }
+ else
+ {
+ receiveList.add(b);
+ }
}
}
//clear readbuffer
readBuffer.rewind();
}
+ protected void incrementServerPing()
+ {
+ }
+
private boolean validateFrame(ClientStompFrame f) throws UnsupportedEncodingException
{
String h = f.getHeader("content-length");
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrame.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrame.java 2011-11-02
01:16:08 UTC (rev 11628)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrame.java 2011-11-02
03:05:57 UTC (rev 11629)
@@ -42,4 +42,6 @@
public ByteBuffer toByteBufferWithExtra(String str) throws
UnsupportedEncodingException;
+ public boolean isPing();
+
}
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrameV10.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrameV10.java 2011-11-02
01:16:08 UTC (rev 11628)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrameV10.java 2011-11-02
03:05:57 UTC (rev 11629)
@@ -25,5 +25,11 @@
{
super(command);
}
+
+ @Override
+ public boolean isPing()
+ {
+ return false;
+ }
}
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrameV11.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrameV11.java 2011-11-02
01:16:08 UTC (rev 11628)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrameV11.java 2011-11-02
03:05:57 UTC (rev 11629)
@@ -21,6 +21,7 @@
public class ClientStompFrameV11 extends AbstractClientStompFrame
{
boolean forceOneway = false;
+ boolean isPing = false;
public ClientStompFrameV11(String command)
{
@@ -43,4 +44,14 @@
}
return false;
}
+
+ public void setPing(boolean b)
+ {
+ isPing = b;
+ }
+
+ public boolean isPing()
+ {
+ return isPing;
+ }
}
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java 2011-11-02
01:16:08 UTC (rev 11628)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java 2011-11-02
03:05:57 UTC (rev 11629)
@@ -52,6 +52,8 @@
void destroy();
ClientStompFrame sendWickedFrame(ClientStompFrame frame) throws IOException,
InterruptedException;
+
+ int getServerPingNumber();
}
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java 2011-11-02
01:16:08 UTC (rev 11628)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java 2011-11-02
03:05:57 UTC (rev 11629)
@@ -88,14 +88,16 @@
@Override
public void startPinger(long interval)
{
- // TODO Auto-generated method stub
-
}
@Override
public void stopPinger()
{
- // TODO Auto-generated method stub
-
}
+
+ @Override
+ public int getServerPingNumber()
+ {
+ return 0;
+ }
}
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java 2011-11-02
01:16:08 UTC (rev 11628)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java 2011-11-02
03:05:57 UTC (rev 11629)
@@ -29,6 +29,7 @@
public static final String RECEIPT_HEADER = "receipt";
private Pinger pinger;
+ private volatile int serverPingCounter;
public StompClientConnectionV11(String host, int port) throws IOException
{
@@ -126,6 +127,7 @@
public void disconnect() throws IOException, InterruptedException
{
stopPinger();
+
ClientStompFrame frame = factory.newFrame(DISCONNECT_COMMAND);
frame.addHeader("receipt", "1");
@@ -184,6 +186,7 @@
pingFrame = (ClientStompFrameV11) createFrame("STOMP");
pingFrame.setBody("\n");
pingFrame.setForceOneway();
+ pingFrame.setPing(true);
}
public void startPing()
@@ -205,12 +208,8 @@
{
try
{
- System.out.println("============sending ping");
-
sendFrame(pingFrame);
- System.out.println("Pinged " + pingFrame);
-
this.wait(pingInterval);
}
catch (Exception e)
@@ -219,9 +218,20 @@
e.printStackTrace();
}
}
- System.out.println("Pinger stopped");
}
}
}
+ @Override
+ public int getServerPingNumber()
+ {
+ return serverPingCounter;
+ }
+
+ protected void incrementServerPing()
+ {
+ serverPingCounter++;
+ }
+
+
}
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-11-02
01:16:08 UTC (rev 11628)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-11-02
03:05:57 UTC (rev 11629)
@@ -55,6 +55,7 @@
protected void tearDown() throws Exception
{
+ System.out.println("Connection 11 : " + connV11.isConnected());
if (connV11.isConnected())
{
connV11.disconnect();
@@ -238,6 +239,7 @@
//unsub
ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
unsubFrame.addHeader("id", "a-sub");
+ newConn.sendFrame(unsubFrame);
newConn.disconnect();
}
@@ -491,9 +493,8 @@
ClientStompFrame reply = connV11.sendFrame(frame);
assertEquals("CONNECTED", reply.getCommand());
-
assertEquals("500,500", reply.getHeader("heart-beat"));
-
+
connV11.disconnect();
//heart-beat (500,1000)
@@ -518,7 +519,7 @@
Thread.sleep(10000);
//now check the frame size
- int size = connV11.getFrameQueueSize();
+ int size = connV11.getServerPingNumber();
System.out.println("ping received: " + size);
@@ -533,12 +534,211 @@
//send will be ok
connV11.sendFrame(frame);
- connV11.stopPinger();
-
connV11.disconnect();
+ }
+
+ public void testSendWithHeartBeatsAndReceive() throws Exception
+ {
+ StompClientConnection newConn = null;
+ try
+ {
+ ClientStompFrame frame = connV11.createFrame("CONNECT");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+ frame.addHeader("heart-beat", "500,1000");
+ frame.addHeader("accept-version", "1.0,1.1");
+ connV11.sendFrame(frame);
+
+ connV11.startPinger(500);
+
+ frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("content-type", "text/plain");
+
+ for (int i = 0; i < 10; i++)
+ {
+ frame.setBody("Hello World " + i + "!");
+ connV11.sendFrame(frame);
+ Thread.sleep(500);
+ }
+
+ // subscribe
+ newConn = StompClientConnectionFactory.createClientConnection("1.1",
+ hostname, port);
+ newConn.connect(defUser, defPass);
+
+ ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
+ subFrame.addHeader("id", "a-sub");
+ subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ subFrame.addHeader("ack", "auto");
+
+ newConn.sendFrame(subFrame);
+
+ int cnt = 0;
+
+ frame = newConn.receiveFrame();
+
+ while (frame != null)
+ {
+ cnt++;
+ Thread.sleep(500);
+ frame = newConn.receiveFrame(5000);
+ }
+
+ assertEquals(10, cnt);
+
+ // unsub
+ ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
+ unsubFrame.addHeader("id", "a-sub");
+ newConn.sendFrame(unsubFrame);
+ }
+ finally
+ {
+ if (newConn != null)
+ newConn.disconnect();
+ connV11.disconnect();
+ }
}
+ public void testSendAndReceiveWithHeartBeats() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("content-type", "text/plain");
+
+ for (int i = 0; i < 10; i++)
+ {
+ frame.setBody("Hello World " + i + "!");
+ connV11.sendFrame(frame);
+ Thread.sleep(500);
+ }
+
+ //subscribe
+ StompClientConnection newConn =
StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ try
+ {
+ frame = newConn.createFrame("CONNECT");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+ frame.addHeader("heart-beat", "500,1000");
+ frame.addHeader("accept-version", "1.0,1.1");
+
+ newConn.sendFrame(frame);
+
+ newConn.startPinger(500);
+
+ Thread.sleep(500);
+
+ ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
+ subFrame.addHeader("id", "a-sub");
+ subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ subFrame.addHeader("ack", "auto");
+
+ newConn.sendFrame(subFrame);
+
+ int cnt = 0;
+
+ frame = newConn.receiveFrame();
+
+ while (frame != null)
+ {
+ cnt++;
+ Thread.sleep(500);
+ frame = newConn.receiveFrame(5000);
+ }
+
+ assertEquals(10, cnt);
+
+ // unsub
+ ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
+ unsubFrame.addHeader("id", "a-sub");
+ newConn.sendFrame(unsubFrame);
+ }
+ finally
+ {
+ newConn.disconnect();
+ }
+ }
+
+ public void testSendWithHeartBeatsAndReceiveWithHeartBeats() throws Exception
+ {
+ StompClientConnection newConn = null;
+ try
+ {
+ ClientStompFrame frame = connV11.createFrame("CONNECT");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+ frame.addHeader("heart-beat", "500,1000");
+ frame.addHeader("accept-version", "1.0,1.1");
+
+ connV11.sendFrame(frame);
+
+ connV11.startPinger(500);
+
+ frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("content-type", "text/plain");
+
+ for (int i = 0; i < 10; i++)
+ {
+ frame.setBody("Hello World " + i + "!");
+ connV11.sendFrame(frame);
+ Thread.sleep(500);
+ }
+
+ // subscribe
+ newConn = StompClientConnectionFactory.createClientConnection("1.1",
+ hostname, port);
+ frame = newConn.createFrame("CONNECT");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+ frame.addHeader("heart-beat", "500,1000");
+ frame.addHeader("accept-version", "1.0,1.1");
+
+ newConn.sendFrame(frame);
+
+ newConn.startPinger(500);
+
+ Thread.sleep(500);
+
+ ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
+ subFrame.addHeader("id", "a-sub");
+ subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ subFrame.addHeader("ack", "auto");
+
+ newConn.sendFrame(subFrame);
+
+ int cnt = 0;
+
+ frame = newConn.receiveFrame();
+
+ while (frame != null)
+ {
+ cnt++;
+ Thread.sleep(500);
+ frame = newConn.receiveFrame(5000);
+ }
+ assertEquals(10, cnt);
+
+ // unsub
+ ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
+ unsubFrame.addHeader("id", "a-sub");
+ newConn.sendFrame(unsubFrame);
+ }
+ finally
+ {
+ if (newConn != null)
+ newConn.disconnect();
+ connV11.disconnect();
+ }
+ }
+
public void testNack() throws Exception
{
connV11.connect(defUser, defPass);