[hornetq-commits] JBoss hornetq SVN: r10224 - in branches/stomp_1_1: src/main/org/hornetq/core/remoting/server and 2 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Wed Feb 16 22:41:20 EST 2011
Author: gurkapa
Date: 2011-02-16 22:41:20 -0500 (Wed, 16 Feb 2011)
New Revision: 10224
Modified:
branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/Stomp.java
branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
branches/stomp_1_1/src/main/org/hornetq/core/remoting/server/RemotingService.java
branches/stomp_1_1/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
branches/stomp_1_1/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://issues.jboss.org/browse/HORNETQ-129 - Implement STOMP 1.1
Adding negotiation and support for heart beating from client to server with stomp 1.1.
Modified: branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/Stomp.java
===================================================================
--- branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/Stomp.java 2011-02-17 03:34:32 UTC (rev 10223)
+++ branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/Stomp.java 2011-02-17 03:41:20 UTC (rev 10224)
@@ -73,6 +73,8 @@
String MESSAGE = "MESSAGE";
String RECEIPT = "RECEIPT";
+
+ String HEART_BEAT = "\n";
}
public interface Headers
@@ -174,6 +176,8 @@
String ACCEPT_VERSION = "accept-version";
String HOST = "host";
+
+ String HEART_BEAT = "heart-beat";
}
public interface Error
@@ -190,6 +194,8 @@
String RESPONSE_ID = "response-id";
String VERSION = "version";
+
+ String HEART_BEAT = "heart-beat";
}
public interface Ack
Modified: branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2011-02-17 03:34:32 UTC (rev 10223)
+++ branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2011-02-17 03:41:20 UTC (rev 10224)
@@ -55,6 +55,10 @@
private boolean destroyed = false;
+ private boolean sendHeartBeat = false;
+
+ private boolean receiveHeartBeat = false;
+
private final long creationTime;
private StompDecoder decoder = new StompDecoder();
@@ -321,6 +325,26 @@
return version;
}
+ public void setSendHeartBeat(boolean sendHeartBeat)
+ {
+ this.sendHeartBeat = sendHeartBeat;
+ }
+
+ public boolean isSendHeartBeat()
+ {
+ return sendHeartBeat;
+ }
+
+ public void setReceiveHeartBeat(boolean receiveHeartBeat)
+ {
+ this.receiveHeartBeat = receiveHeartBeat;
+ }
+
+ public boolean isReceiveHeartBeat()
+ {
+ return receiveHeartBeat;
+ }
+
private void callFailureListeners(final HornetQException me)
{
final List<FailureListener> listenersClone = new ArrayList<FailureListener>(failureListeners);
Modified: branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-02-17 03:34:32 UTC (rev 10223)
+++ branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-02-17 03:41:20 UTC (rev 10224)
@@ -32,6 +32,7 @@
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.ServerSession;
import org.hornetq.core.server.impl.ServerMessageImpl;
@@ -113,8 +114,10 @@
{
StompConnection conn = new StompConnection(connection, this);
- // Note that STOMP has no heartbeat, so if connection ttl is non zero, data must continue to be sent or connection
+ // Note that STOMP 1.0 has no heartbeat, so if connection ttl is non zero, data must continue to be sent or connection
// will be timed out and closed!
+ // From version 1.1 there is heartbeats however, the times for them will be negotiated when clients connect
+ // the server will prefer the ttl setting for this.
long ttl = server.getConfiguration().getConnectionTTLOverride();
@@ -578,6 +581,7 @@
String clientID = (String)headers.get(Stomp.Headers.Connect.CLIENT_ID);
String requestID = (String)headers.get(Stomp.Headers.Connect.REQUEST_ID);
String acceptVersion = (String)headers.get(Stomp.Headers.Connect.ACCEPT_VERSION);
+ String heartBeats = (String)headers.get(Stomp.Headers.Connect.HEART_BEAT);
HornetQSecurityManager sm = server.getSecurityManager();
@@ -586,7 +590,6 @@
{
sm.validateUser(login, passcode);
}
-
String version = negotiateVersion(acceptVersion);
connection.setLogin(login);
@@ -598,6 +601,7 @@
return createNegotiationFailedFrame();
}
connection.setVersion(version);
+ String serverPreferedHeartBeat = negotiateHeartBeat(heartBeats, connection);
HashMap<String, Object> h = new HashMap<String, Object>();
h.put(Stomp.Headers.Connected.SESSION, connection.getID());
@@ -610,6 +614,9 @@
// Only put this in header if we got a accept-version header.
h.put(Stomp.Headers.Connected.VERSION, version);
}
+ if (serverPreferedHeartBeat != null){
+ h.put(Stomp.Headers.Connected.HEART_BEAT, serverPreferedHeartBeat);
+ }
return new StompFrame(Stomp.Responses.CONNECTED, h);
}
@@ -634,6 +641,42 @@
return Stomp.Versions.V10;
}
+ private String negotiateHeartBeat(String heartBeats, StompConnection connection) throws Exception
+ {
+ StringBuilder agreedHeartBeats;
+ if (heartBeats == null)
+ {
+ return null;
+ }
+ else
+ {
+ agreedHeartBeats = new StringBuilder();
+ String[] splitBeats = heartBeats.split(",");
+ if (splitBeats.length != 2)
+ {
+ throw new StompException("Heart beat parameters are incorrect. Need to be two integers separated by a comma.");
+ }
+ // We do not support sending heart beats from the server
+ agreedHeartBeats.append("0,");
+ long clientSendMiliSeconds = Long.parseLong(splitBeats[0]);
+ long currentTtl = server.getRemotingService().getCurrentTtl(connection);
+ if (clientSendMiliSeconds > currentTtl)
+ {
+ server.getRemotingService().changeConnectionTtl(connection, clientSendMiliSeconds);
+ agreedHeartBeats.append(clientSendMiliSeconds);
+ }
+ else if (currentTtl == -1)
+ {
+ agreedHeartBeats.append("0");
+ }
+ else
+ {
+ agreedHeartBeats.append(currentTtl);
+ }
+ }
+ return agreedHeartBeats.toString();
+ }
+
private StompFrame createNegotiationFailedFrame() throws Exception
{
HashMap<String, Object> h = new HashMap<String, Object>();
Modified: branches/stomp_1_1/src/main/org/hornetq/core/remoting/server/RemotingService.java
===================================================================
--- branches/stomp_1_1/src/main/org/hornetq/core/remoting/server/RemotingService.java 2011-02-17 03:34:32 UTC (rev 10223)
+++ branches/stomp_1_1/src/main/org/hornetq/core/remoting/server/RemotingService.java 2011-02-17 03:41:20 UTC (rev 10224)
@@ -16,8 +16,10 @@
import java.util.Set;
import org.hornetq.api.core.Interceptor;
+import org.hornetq.core.protocol.stomp.StompConnection;
import org.hornetq.core.server.HornetQComponent;
import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.remoting.Connection;
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -45,4 +47,8 @@
void freeze();
RemotingConnection getServerSideReplicatingConnection();
+
+ void changeConnectionTtl(StompConnection connection, long newTtl);
+
+ long getCurrentTtl(StompConnection connection) throws Exception;
}
Modified: branches/stomp_1_1/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/stomp_1_1/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2011-02-17 03:34:32 UTC (rev 10223)
+++ branches/stomp_1_1/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2011-02-17 03:41:20 UTC (rev 10224)
@@ -27,6 +27,8 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import javax.xml.ws.ProtocolException;
+
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Interceptor;
@@ -34,6 +36,7 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.core.impl.CoreProtocolManagerFactory;
+import org.hornetq.core.protocol.stomp.StompConnection;
import org.hornetq.core.protocol.stomp.StompProtocolManagerFactory;
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
@@ -571,4 +574,22 @@
}
}
+ public void changeConnectionTtl(StompConnection connection, long newTtl)
+ {
+ ConnectionEntry entry = connections.get(connection.getID());
+ if (entry != null)
+ {
+ entry.ttl = newTtl;
+ }
+ }
+
+ public long getCurrentTtl(StompConnection connection) throws Exception
+ {
+ ConnectionEntry entry = connections.get(connection.getID());
+ if (entry == null)
+ {
+ throw new Exception("No Connection Entry for the connection " + connection.getID());
+ }
+ return entry.ttl;
+ }
}
\ No newline at end of file
Modified: branches/stomp_1_1/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- branches/stomp_1_1/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2011-02-17 03:34:32 UTC (rev 10223)
+++ branches/stomp_1_1/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2011-02-17 03:41:20 UTC (rev 10224)
@@ -105,6 +105,24 @@
Assert.assertTrue(f.indexOf("version:1.1") >= 0);
}
+ public void testHeartBeatNegotiation() throws Exception
+ {
+ String connect_frame = "CONNECT\n" + "login: brianm\n" +
+ "passcode: wombats\n" +
+ "request-id: 1\n" +
+ "accept-version: 1.1\n" +
+ "heart-beat: 100000,100000\n" +
+ "\n" +
+ Stomp.NULL;
+ sendFrame(connect_frame);
+
+ String f = receiveFrame(10000);
+ Assert.assertTrue(f.startsWith("CONNECTED"));
+ Assert.assertTrue(f.indexOf("response-id:1") >= 0);
+ Assert.assertTrue(f.indexOf("version:1.1") >= 0);
+ Assert.assertTrue(f.indexOf("heart-beat:0,100000") >= 0);
+ }
+
public void testConnectWithStomp() throws Exception
{
String connect_frame = "STOMP\n" + "login: brianm\n" +
@@ -1288,4 +1306,63 @@
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
sendFrame(frame);
}
+
+ public void testConnectionClosedInLackOfHeartBeat() throws Exception
+ {
+ String connect_frame = "CONNECT\n" + "login: brianm\n" +
+ "passcode: wombats\n" +
+ "request-id: 1\n" +
+ "accept-version: 1.1\n" +
+ "heart-beat: 60000,60000\n" +
+ "\n" +
+ Stomp.NULL;
+ sendFrame(connect_frame);
+
+ String f = receiveFrame(10000);
+ Assert.assertTrue(f.startsWith("CONNECTED"));
+ Assert.assertTrue(f.indexOf("heart-beat:0,60000") >= 0);
+ // Wait until the connection should be closed.
+ Thread.sleep(63000);
+ String frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
+ sendFrame(frame);
+ try
+ {
+ frame = receiveFrame(1000);
+ log.info("Received frame: " + frame);
+ Assert.fail("No message should have been received as the connection should have been closed.");
+ }
+ catch (SocketTimeoutException e)
+ {
+ }
+ }
+
+ public void testHeartBeatKeepsConnectionAlive() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ String connect_frame = "CONNECT\n" + "login: brianm\n" +
+ "passcode: wombats\n" +
+ "request-id: 1\n" +
+ "accept-version: 1.1\n" +
+ "heart-beat: 60000,60000\n" +
+ "\n" +
+ Stomp.NULL;
+ sendFrame(connect_frame);
+
+ String f = receiveFrame(10000);
+ Assert.assertTrue(f.startsWith("CONNECTED"));
+ Assert.assertTrue(f.indexOf("heart-beat:0,60000") >= 0);
+ // Wait for time out to be near.
+ Thread.sleep(45000);
+ String frame = Stomp.NEWLINE;
+ Thread.sleep(45000);
+
+
+ frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
+ sendFrame(frame);
+ TextMessage message = (TextMessage)consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", message.getText());
+ }
+
}
More information about the hornetq-commits
mailing list