Author: gaohoward
Date: 2011-09-16 01:20:30 -0400 (Fri, 16 Sep 2011)
New Revision: 11352
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrameV11.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
Log:
tests
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-15
14:34:00 UTC (rev 11351)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-16
05:20:30 UTC (rev 11352)
@@ -99,7 +99,14 @@
if (heartBeat != null)
{
handleHeartBeat(heartBeat);
- response.addHeader(Stomp.Headers.Connected.HEART_BEAT,
"20,100");
+ if (heartBeater == null)
+ {
+ response.addHeader(Stomp.Headers.Connected.HEART_BEAT,
"0,0");
+ }
+ else
+ {
+ response.addHeader(Stomp.Headers.Connected.HEART_BEAT,
heartBeater.getServerHeartBeatValue());
+ }
}
}
else
@@ -147,7 +154,18 @@
public StompFrame onDisconnect(StompFrame frame)
{
log.error("----------------- frame: " + frame);
-
+ if (this.heartBeater != null)
+ {
+ heartBeater.shutdown();
+ try
+ {
+ heartBeater.join();
+ }
+ catch (InterruptedException e)
+ {
+ log.warn("Interrupted while waiting for heart beater to die", e);
+ }
+ }
return null;
}
@@ -371,7 +389,11 @@
@Override
public StompFrame onStomp(StompFrame request)
{
- return onConnect(request);
+ if (!connection.isValid())
+ {
+ return onConnect(request);
+ }
+ return null;
}
@Override
@@ -479,6 +501,9 @@
// 2*serverAcceptPing, disconnect!
private class HeartBeater extends Thread
{
+ final int MIN_SERVER_PING = 200;
+ final int MIN_CLIENT_PING = 500;
+
long serverPing = 0;
long serverAcceptPing = 0;
volatile boolean shutdown = false;
@@ -490,15 +515,26 @@
{
if (clientPing != 0)
{
- serverAcceptPing = clientPing > 100 ? clientPing : 100;
+ serverAcceptPing = clientPing > MIN_CLIENT_PING ? clientPing :
MIN_CLIENT_PING;
}
if (clientAcceptPing != 0)
{
- serverPing = clientAcceptPing > 20 ? clientAcceptPing : 20;
+ serverPing = clientAcceptPing > MIN_SERVER_PING ? clientAcceptPing :
MIN_SERVER_PING;
}
}
+ public synchronized void shutdown()
+ {
+ shutdown = true;
+ this.notify();
+ }
+
+ public String getServerHeartBeatValue()
+ {
+ return String.valueOf(serverPing) + "," +
String.valueOf(serverAcceptPing);
+ }
+
public void pinged()
{
lastPingTime.set(System.currentTimeMillis());
@@ -537,6 +573,9 @@
if (serverAcceptPing != 0)
{
dur2 = System.currentTimeMillis() - lastAccepted.get();
+
+ log.error("-------------------------- dur2 is " + dur2);
+
if (dur2 > (2 * serverAcceptPing))
{
connection.disconnect();
@@ -545,24 +584,51 @@
}
}
- long waitTime1 = serverPing - dur1;
- long waitTime2 = serverAcceptPing * 2 - dur2;
-
- long waitTime = waitTime1 < waitTime2 ? waitTime1 : waitTime2;
+ long waitTime1 = 0;
+ long waitTime2 = 0;
+ if (serverPing > 0)
+ {
+ waitTime1 = serverPing - dur1;
+ }
+
+ if (serverAcceptPing > 0)
+ {
+ waitTime2 = serverAcceptPing * 2 - dur2;
+ }
+
+ long waitTime = 10l;
+
+ if ((waitTime1 > 0) && (waitTime1 > 0))
+ {
+ waitTime = waitTime1 < waitTime2 ? waitTime1 : waitTime2;
+ }
+ else if (waitTime1 > 0)
+ {
+ waitTime = waitTime1;
+ }
+ else if (waitTime2 > 0)
+ {
+ waitTime = waitTime2;
+ }
+
try
{
+ log.error("-------------------waiting for " + waitTime);
this.wait(waitTime);
+ log.error("--------------------wake up " );
}
catch (InterruptedException e)
{
}
}
+ log.error("-------------------------HeartBeat thread shut down!");
}
}
public void pingAccepted()
{
+ log.error("------------------------Ping accepted!");
this.lastAccepted.set(System.currentTimeMillis());
}
}
@@ -572,6 +638,7 @@
{
if (heartBeater != null)
{
+ log.error("----------------------PING accepted: " + request);
heartBeater.pingAccepted();
}
}
Modified:
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java
===================================================================
---
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java 2011-09-15
14:34:00 UTC (rev 11351)
+++
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java 2011-09-16
05:20:30 UTC (rev 11352)
@@ -195,5 +195,10 @@
{
return version;
}
+
+ public int getFrameQueueSize()
+ {
+ return this.frameQueue.size();
+ }
}
Modified:
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrameV11.java
===================================================================
---
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrameV11.java 2011-09-15
14:34:00 UTC (rev 11351)
+++
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrameV11.java 2011-09-16
05:20:30 UTC (rev 11352)
@@ -20,14 +20,23 @@
*/
public class ClientStompFrameV11 extends AbstractClientStompFrame
{
+ boolean forceOneway = false;
+
public ClientStompFrameV11(String command)
{
super(command);
}
+
+ public void setForceOneway()
+ {
+ forceOneway = true;
+ }
@Override
public boolean needsReply()
{
+ if (forceOneway) return false;
+
if ("CONNECT".equals(command) || "STOMP".equals(command) ||
headerKeys.contains(HEADER_RECEIPT))
{
return true;
Modified:
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java
===================================================================
---
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java 2011-09-15
14:34:00 UTC (rev 11351)
+++
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java 2011-09-16
05:20:30 UTC (rev 11352)
@@ -37,6 +37,13 @@
String getVersion();
ClientStompFrame createFrame(String command);
+
+ //number of frames at the queue
+ int getFrameQueueSize();
+
+ void startPinger(long interval);
+
+ void stopPinger();
}
Modified:
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java
===================================================================
---
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java 2011-09-15
14:34:00 UTC (rev 11351)
+++
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java 2011-09-16
05:20:30 UTC (rev 11352)
@@ -65,4 +65,18 @@
// TODO Auto-generated method stub
return null;
}
+
+ @Override
+ public void startPinger(long interval)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void stopPinger()
+ {
+ // TODO Auto-generated method stub
+
+ }
}
Modified:
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java
===================================================================
---
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java 2011-09-15
14:34:00 UTC (rev 11351)
+++
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java 2011-09-16
05:20:30 UTC (rev 11352)
@@ -27,6 +27,8 @@
public static final String HOST_HEADER = "host";
public static final String VERSION_HEADER = "version";
public static final String RECEIPT_HEADER = "receipt";
+
+ private Pinger pinger;
public StompClientConnectionV11(String host, int port) throws IOException
{
@@ -93,6 +95,7 @@
@Override
public void disconnect() throws IOException, InterruptedException
{
+ stopPinger();
ClientStompFrame frame = factory.newFrame(DISCONNECT_COMMAND);
frame.addHeader("receipt", "1");
@@ -114,4 +117,81 @@
return new ClientStompFrameV11(command);
}
+ @Override
+ public void startPinger(long interval)
+ {
+ pinger = new Pinger(interval);
+ pinger.startPing();
+ }
+
+ @Override
+ public void stopPinger()
+ {
+ if (pinger != null)
+ {
+ pinger.stopPing();
+ try
+ {
+ pinger.join();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ pinger = null;
+ }
+ }
+
+ private class Pinger extends Thread
+ {
+ long pingInterval;
+ ClientStompFrameV11 pingFrame;
+ volatile boolean stop = false;
+
+ public Pinger(long interval)
+ {
+ this.pingInterval = interval;
+ pingFrame = (ClientStompFrameV11) createFrame("STOMP");
+ pingFrame.setBody("\n");
+ pingFrame.setForceOneway();
+ }
+
+ public void startPing()
+ {
+ start();
+ }
+
+ public synchronized void stopPing()
+ {
+ stop = true;
+ this.notify();
+ }
+
+ public void run()
+ {
+ synchronized (this)
+ {
+ while (!stop)
+ {
+ try
+ {
+ System.out.println("============sending ping");
+
+ sendFrame(pingFrame);
+
+ System.out.println("Pinged " + pingFrame);
+
+ this.wait(pingInterval);
+ }
+ catch (Exception e)
+ {
+ stop = true;
+ e.printStackTrace();
+ }
+ }
+ System.out.println("Pinger stopped");
+ }
+ }
+ }
+
}
Modified:
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
===================================================================
---
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-15
14:34:00 UTC (rev 11351)
+++
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-16
05:20:30 UTC (rev 11352)
@@ -17,6 +17,8 @@
*/
package org.hornetq.tests.integration.stomp.v11;
+import java.io.IOException;
+
import org.hornetq.core.logging.Logger;
import org.hornetq.tests.integration.stomp.util.ClientStompFrame;
import org.hornetq.tests.integration.stomp.util.StompClientConnection;
@@ -350,6 +352,116 @@
newConn.disconnect();
}
+
+ public void testHeartBeat() throws Exception
+ {
+ //no heart beat at all if heat-beat absent
+ ClientStompFrame frame = connV11.createFrame("CONNECT");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+
+ ClientStompFrame reply = connV11.sendFrame(frame);
+
+ assertEquals("CONNECTED", reply.getCommand());
+
+ Thread.sleep(5000);
+
+ assertEquals(0, connV11.getFrameQueueSize());
+
+ connV11.disconnect();
+
+ //no heart beat for (0,0)
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1",
hostname, port);
+ frame = connV11.createFrame("CONNECT");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+ frame.addHeader("heart-beat", "0,0");
+ frame.addHeader("accept-version", "1.0,1.1");
+
+ reply = connV11.sendFrame(frame);
+
+ assertEquals("CONNECTED", reply.getCommand());
+
+ assertEquals("0,0", reply.getHeader("heart-beat"));
+
+ Thread.sleep(5000);
+
+ assertEquals(0, connV11.getFrameQueueSize());
+
+ connV11.disconnect();
+
+ //heart-beat (1,0), should receive a min client ping accepted by server
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1",
hostname, port);
+ frame = connV11.createFrame("CONNECT");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+ frame.addHeader("heart-beat", "1,0");
+ frame.addHeader("accept-version", "1.0,1.1");
+
+ reply = connV11.sendFrame(frame);
+
+ assertEquals("CONNECTED", reply.getCommand());
+
+ assertEquals("0,500", reply.getHeader("heart-beat"));
+
+ Thread.sleep(2000);
+
+ //now server side should be disconnected because we didn't send ping for 2 sec
+ frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("content-type", "text/plain");
+ frame.setBody("Hello World");
+
+ //send will fail
+ try
+ {
+ connV11.sendFrame(frame);
+ fail("connection should have been destroyed by now");
+ }
+ catch (IOException e)
+ {
+ //ignore
+ }
+
+ //heart-beat (1,0), start a ping, then send a message, should be ok.
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1",
hostname, port);
+ frame = connV11.createFrame("CONNECT");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+ frame.addHeader("heart-beat", "1,0");
+ frame.addHeader("accept-version", "1.0,1.1");
+
+ reply = connV11.sendFrame(frame);
+
+ assertEquals("CONNECTED", reply.getCommand());
+
+ assertEquals("0,500", reply.getHeader("heart-beat"));
+
+ System.out.println("========== start pinger!");
+
+ connV11.startPinger(500);
+
+ Thread.sleep(2000);
+
+ //now server side should be disconnected because we didn't send ping for 2 sec
+ frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("content-type", "text/plain");
+ frame.setBody("Hello World");
+
+ //send will be ok
+ connV11.sendFrame(frame);
+
+ connV11.stopPinger();
+
+ connV11.disconnect();
+
+ }
+
}