[hornetq-commits] JBoss hornetq SVN: r10224 - in branches/stomp_1_1: src/main/org/hornetq/core/remoting/server and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Feb 16 22:41:20 EST 2011


Author: gurkapa
Date: 2011-02-16 22:41:20 -0500 (Wed, 16 Feb 2011)
New Revision: 10224

Modified:
   branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/Stomp.java
   branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
   branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
   branches/stomp_1_1/src/main/org/hornetq/core/remoting/server/RemotingService.java
   branches/stomp_1_1/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
   branches/stomp_1_1/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://issues.jboss.org/browse/HORNETQ-129 - Implement STOMP 1.1
Adding negotiation and support for heart beating from client to server with stomp 1.1. 



Modified: branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/Stomp.java
===================================================================
--- branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/Stomp.java	2011-02-17 03:34:32 UTC (rev 10223)
+++ branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/Stomp.java	2011-02-17 03:41:20 UTC (rev 10224)
@@ -73,6 +73,8 @@
       String MESSAGE = "MESSAGE";
 
       String RECEIPT = "RECEIPT";
+
+      String HEART_BEAT = "\n";
    }
 
    public interface Headers
@@ -174,6 +176,8 @@
          String ACCEPT_VERSION = "accept-version";
          
          String HOST = "host";
+         
+         String HEART_BEAT = "heart-beat";
       }
 
       public interface Error
@@ -190,6 +194,8 @@
          String RESPONSE_ID = "response-id";
          
          String VERSION = "version";
+         
+         String HEART_BEAT = "heart-beat";
       }
 
       public interface Ack

Modified: branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompConnection.java	2011-02-17 03:34:32 UTC (rev 10223)
+++ branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompConnection.java	2011-02-17 03:41:20 UTC (rev 10224)
@@ -55,6 +55,10 @@
 
    private boolean destroyed = false;
    
+   private boolean sendHeartBeat = false;
+   
+   private boolean receiveHeartBeat = false;
+   
    private final long creationTime;
 
    private StompDecoder decoder = new StompDecoder();
@@ -321,6 +325,26 @@
       return version;
    }
 
+   public void setSendHeartBeat(boolean sendHeartBeat)
+   {
+      this.sendHeartBeat = sendHeartBeat;
+   }
+
+   public boolean isSendHeartBeat()
+   {
+      return sendHeartBeat;
+   }
+
+   public void setReceiveHeartBeat(boolean receiveHeartBeat)
+   {
+      this.receiveHeartBeat = receiveHeartBeat;
+   }
+
+   public boolean isReceiveHeartBeat()
+   {
+      return receiveHeartBeat;
+   }
+
    private void callFailureListeners(final HornetQException me)
    {
       final List<FailureListener> listenersClone = new ArrayList<FailureListener>(failureListeners);

Modified: branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2011-02-17 03:34:32 UTC (rev 10223)
+++ branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2011-02-17 03:41:20 UTC (rev 10224)
@@ -32,6 +32,7 @@
 import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.ServerSession;
 import org.hornetq.core.server.impl.ServerMessageImpl;
@@ -113,8 +114,10 @@
    {
       StompConnection conn = new StompConnection(connection, this);
 
-      // Note that STOMP has no heartbeat, so if connection ttl is non zero, data must continue to be sent or connection
+      // Note that STOMP 1.0 has no heartbeat, so if connection ttl is non zero, data must continue to be sent or connection
       // will be timed out and closed!
+      // From version 1.1 there is heartbeats however, the times for them will be negotiated when clients connect
+      // the server will prefer the ttl setting for this.
 
       long ttl = server.getConfiguration().getConnectionTTLOverride();
 
@@ -578,6 +581,7 @@
       String clientID = (String)headers.get(Stomp.Headers.Connect.CLIENT_ID);
       String requestID = (String)headers.get(Stomp.Headers.Connect.REQUEST_ID);
       String acceptVersion = (String)headers.get(Stomp.Headers.Connect.ACCEPT_VERSION);
+      String heartBeats = (String)headers.get(Stomp.Headers.Connect.HEART_BEAT);
 
       HornetQSecurityManager sm = server.getSecurityManager();
       
@@ -586,7 +590,6 @@
       {
          sm.validateUser(login, passcode);
       }
-      
       String version = negotiateVersion(acceptVersion);
 
       connection.setLogin(login);
@@ -598,6 +601,7 @@
          return createNegotiationFailedFrame();
       }
       connection.setVersion(version);
+      String serverPreferedHeartBeat = negotiateHeartBeat(heartBeats, connection);
 
       HashMap<String, Object> h = new HashMap<String, Object>();
       h.put(Stomp.Headers.Connected.SESSION, connection.getID());
@@ -610,6 +614,9 @@
          // Only put this in header if we got a accept-version header.
          h.put(Stomp.Headers.Connected.VERSION, version);
       }
+      if (serverPreferedHeartBeat != null){
+         h.put(Stomp.Headers.Connected.HEART_BEAT, serverPreferedHeartBeat);
+      }
       return new StompFrame(Stomp.Responses.CONNECTED, h);
    }
 
@@ -634,6 +641,42 @@
       return Stomp.Versions.V10;
    }
 
+   private String negotiateHeartBeat(String heartBeats, StompConnection connection) throws Exception
+   {
+      StringBuilder agreedHeartBeats;
+      if (heartBeats == null)
+      {
+         return null;
+      } 
+      else 
+      {
+         agreedHeartBeats = new StringBuilder();
+         String[] splitBeats = heartBeats.split(",");
+         if (splitBeats.length != 2)
+         {
+            throw new StompException("Heart beat parameters are incorrect. Need to be two integers separated by a comma.");
+         }
+         // We do not support sending heart beats from the server
+         agreedHeartBeats.append("0,");
+         long clientSendMiliSeconds = Long.parseLong(splitBeats[0]);
+         long currentTtl = server.getRemotingService().getCurrentTtl(connection);
+         if (clientSendMiliSeconds > currentTtl)
+         {
+            server.getRemotingService().changeConnectionTtl(connection, clientSendMiliSeconds);
+            agreedHeartBeats.append(clientSendMiliSeconds);
+         } 
+         else if (currentTtl == -1)
+         {
+            agreedHeartBeats.append("0");
+         }
+         else 
+         {
+            agreedHeartBeats.append(currentTtl);
+         }
+      }
+      return agreedHeartBeats.toString();
+   }
+   
    private StompFrame createNegotiationFailedFrame() throws Exception
    {
       HashMap<String, Object> h = new HashMap<String, Object>();

Modified: branches/stomp_1_1/src/main/org/hornetq/core/remoting/server/RemotingService.java
===================================================================
--- branches/stomp_1_1/src/main/org/hornetq/core/remoting/server/RemotingService.java	2011-02-17 03:34:32 UTC (rev 10223)
+++ branches/stomp_1_1/src/main/org/hornetq/core/remoting/server/RemotingService.java	2011-02-17 03:41:20 UTC (rev 10224)
@@ -16,8 +16,10 @@
 import java.util.Set;
 
 import org.hornetq.api.core.Interceptor;
+import org.hornetq.core.protocol.stomp.StompConnection;
 import org.hornetq.core.server.HornetQComponent;
 import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.remoting.Connection;
 
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -45,4 +47,8 @@
    void freeze();
 
    RemotingConnection getServerSideReplicatingConnection();
+   
+   void changeConnectionTtl(StompConnection connection, long newTtl);
+   
+   long getCurrentTtl(StompConnection connection) throws Exception;
 }

Modified: branches/stomp_1_1/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/stomp_1_1/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2011-02-17 03:34:32 UTC (rev 10223)
+++ branches/stomp_1_1/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2011-02-17 03:41:20 UTC (rev 10224)
@@ -27,6 +27,8 @@
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
+import javax.xml.ws.ProtocolException;
+
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.Interceptor;
@@ -34,6 +36,7 @@
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.protocol.core.impl.CoreProtocolManagerFactory;
+import org.hornetq.core.protocol.stomp.StompConnection;
 import org.hornetq.core.protocol.stomp.StompProtocolManagerFactory;
 import org.hornetq.core.remoting.FailureListener;
 import org.hornetq.core.remoting.impl.netty.TransportConstants;
@@ -571,4 +574,22 @@
       }
    }
 
+   public void changeConnectionTtl(StompConnection connection, long newTtl)
+   {
+      ConnectionEntry entry = connections.get(connection.getID());
+      if (entry != null)
+      {
+         entry.ttl = newTtl;
+      }
+   }
+
+   public long getCurrentTtl(StompConnection connection) throws Exception
+   {
+      ConnectionEntry entry = connections.get(connection.getID());
+      if (entry == null)
+      {
+         throw new Exception("No Connection Entry for the connection " + connection.getID());
+      }
+      return entry.ttl;
+   }
 }
\ No newline at end of file

Modified: branches/stomp_1_1/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- branches/stomp_1_1/tests/src/org/hornetq/tests/integration/stomp/StompTest.java	2011-02-17 03:34:32 UTC (rev 10223)
+++ branches/stomp_1_1/tests/src/org/hornetq/tests/integration/stomp/StompTest.java	2011-02-17 03:41:20 UTC (rev 10224)
@@ -105,6 +105,24 @@
       Assert.assertTrue(f.indexOf("version:1.1") >= 0);
    }
 
+   public void testHeartBeatNegotiation() throws Exception
+   {
+      String connect_frame = "CONNECT\n" + "login: brianm\n" +
+                             "passcode: wombats\n" +
+                             "request-id: 1\n" +
+                             "accept-version: 1.1\n" +
+                             "heart-beat: 100000,100000\n" +
+                             "\n" +
+                             Stomp.NULL;
+      sendFrame(connect_frame);
+
+      String f = receiveFrame(10000);
+      Assert.assertTrue(f.startsWith("CONNECTED"));
+      Assert.assertTrue(f.indexOf("response-id:1") >= 0);
+      Assert.assertTrue(f.indexOf("version:1.1") >= 0);
+      Assert.assertTrue(f.indexOf("heart-beat:0,100000") >= 0);
+   }
+
    public void testConnectWithStomp() throws Exception
    {
       String connect_frame = "STOMP\n" + "login: brianm\n" +
@@ -1288,4 +1306,63 @@
       frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
       sendFrame(frame);
    }
+   
+   public void testConnectionClosedInLackOfHeartBeat() throws Exception
+   {
+      String connect_frame = "CONNECT\n" + "login: brianm\n" +
+                             "passcode: wombats\n" +
+                             "request-id: 1\n" +
+                             "accept-version: 1.1\n" +
+                             "heart-beat: 60000,60000\n" +
+                             "\n" +
+                             Stomp.NULL;
+      sendFrame(connect_frame);
+
+      String f = receiveFrame(10000);
+      Assert.assertTrue(f.startsWith("CONNECTED"));
+      Assert.assertTrue(f.indexOf("heart-beat:0,60000") >= 0);
+      // Wait until the connection should be closed.
+      Thread.sleep(63000);
+      String frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
+      sendFrame(frame);
+      try
+      {
+         frame = receiveFrame(1000);
+         log.info("Received frame: " + frame);
+         Assert.fail("No message should have been received as the connection should have been closed.");
+      }
+      catch (SocketTimeoutException e)
+      {
+      }
+   }
+
+   public void testHeartBeatKeepsConnectionAlive() throws Exception
+   {
+      MessageConsumer consumer = session.createConsumer(queue);
+
+      String connect_frame = "CONNECT\n" + "login: brianm\n" +
+                             "passcode: wombats\n" +
+                             "request-id: 1\n" +
+                             "accept-version: 1.1\n" +
+                             "heart-beat: 60000,60000\n" +
+                             "\n" +
+                             Stomp.NULL;
+      sendFrame(connect_frame);
+
+      String f = receiveFrame(10000);
+      Assert.assertTrue(f.startsWith("CONNECTED"));
+      Assert.assertTrue(f.indexOf("heart-beat:0,60000") >= 0);
+      // Wait for time out to be near.
+      Thread.sleep(45000);
+      String frame = Stomp.NEWLINE;
+      Thread.sleep(45000);
+      
+      
+      frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
+      sendFrame(frame);
+      TextMessage message = (TextMessage)consumer.receive(1000);
+      Assert.assertNotNull(message);
+      Assert.assertEquals("Hello World", message.getText());
+   }
+
 }



More information about the hornetq-commits mailing list