[hornetq-commits] JBoss hornetq SVN: r10142 - in branches/stomp_1_1: tests/src/org/hornetq/tests/integration/stomp and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Jan 24 22:17:51 EST 2011


Author: gurkapa
Date: 2011-01-24 22:17:50 -0500 (Mon, 24 Jan 2011)
New Revision: 10142

Modified:
   branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/Stomp.java
   branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
   branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompDecoder.java
   branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
   branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompUtils.java
   branches/stomp_1_1/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://issues.jboss.org/browse/HORNETQ-553 Implement STOMP 1.1 Specification

Implementation of the version negotiation introduced in 1.1 spec


Modified: branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/Stomp.java
===================================================================
--- branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/Stomp.java	2011-01-25 02:55:02 UTC (rev 10141)
+++ branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/Stomp.java	2011-01-25 03:17:50 UTC (rev 10142)
@@ -28,9 +28,18 @@
 
    String NEWLINE = "\n";
 
+   public static interface Versions
+   {
+      String V10 = "1.0";
+      
+      String V11 = "1.1";
+   }
+   
    public static interface Commands
    {
       String CONNECT = "CONNECT";
+      
+      String STOMP = "STOMP";
 
       String SEND = "SEND";
 
@@ -75,6 +84,8 @@
       String TRANSACTION = "transaction";
 
       String CONTENT_LENGTH = "content-length";
+      
+      String CONTENT_TYPE = "content-type";
 
       public interface Response
       {
@@ -159,11 +170,17 @@
          String CLIENT_ID = "client-id";
 
          String REQUEST_ID = "request-id";
+         
+         String ACCEPT_VERSION = "accept-version";
+         
+         String HOST = "host";
       }
 
       public interface Error
       {
          String MESSAGE = "message";
+         
+         String VERSION = "version";
       }
 
       public interface Connected
@@ -171,6 +188,8 @@
          String SESSION = "session";
 
          String RESPONSE_ID = "response-id";
+         
+         String VERSION = "version";
       }
 
       public interface Ack

Modified: branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompConnection.java	2011-01-25 02:55:02 UTC (rev 10141)
+++ branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompConnection.java	2011-01-25 03:17:50 UTC (rev 10142)
@@ -48,6 +48,8 @@
    private String passcode;
 
    private String clientID;
+   
+   private String version;
 
    private boolean valid;
 
@@ -309,6 +311,16 @@
       this.valid = valid;
    }
 
+   public void setVersion(String version)
+   {
+      this.version = version;
+   }
+
+   public String getVersion()
+   {
+      return version;
+   }
+
    private void callFailureListeners(final HornetQException me)
    {
       final List<FailureListener> listenersClone = new ArrayList<FailureListener>(failureListeners);

Modified: branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompDecoder.java
===================================================================
--- branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompDecoder.java	2011-01-25 02:55:02 UTC (rev 10141)
+++ branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompDecoder.java	2011-01-25 03:17:50 UTC (rev 10142)
@@ -59,6 +59,10 @@
    private static final String COMMAND_SEND = "SEND";
 
    private static final int COMMAND_SEND_LENGTH = COMMAND_SEND.length();
+   
+   private static final String COMMAND_STOMP = "STOMP";
+   
+   private static final int COMMAND_STOMP_LENGTH = COMMAND_STOMP.length();
 
    private static final String COMMAND_SUBSCRIBE = "SUBSCRIBE";
 
@@ -82,6 +86,8 @@
 
    private static final byte S = (byte)'S';
 
+   private static final byte T = (byte)'T';
+   
    private static final byte U = (byte)'U';
 
    private static final byte HEADER_SEPARATOR = (byte)':';
@@ -267,6 +273,16 @@
 
                   // SEND
                   command = COMMAND_SEND;
+               } 
+               else if (workingBuffer[offset + 1] == T)
+               {
+                  if (!tryIncrement(offset + COMMAND_STOMP_LENGTH + 1))
+                  {
+                     return null;
+                  }
+                  
+                  // STOMP
+                  command = COMMAND_STOMP;
                }
                else
                {

Modified: branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2011-01-25 02:55:02 UTC (rev 10141)
+++ branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2011-01-25 03:17:50 UTC (rev 10142)
@@ -50,12 +50,12 @@
 class StompProtocolManager implements ProtocolManager
 {
    // Constants -----------------------------------------------------
-
+   
    private static final Logger log = Logger.getLogger(StompProtocolManager.class);
 
    // TODO use same value than HornetQConnection
    private static final String CONNECTION_ID_PROP = "__HQ_CID";
-
+   
    // Attributes ----------------------------------------------------
 
    private final HornetQServer server;
@@ -178,7 +178,7 @@
 
             StompFrame response = null;
 
-            if (Stomp.Commands.CONNECT.equals(command))
+            if (Stomp.Commands.CONNECT.equals(command) || Stomp.Commands.STOMP.equals(command))
             {
                response = onConnect(request, conn);
             }
@@ -577,6 +577,7 @@
       String passcode = (String)headers.get(Stomp.Headers.Connect.PASSCODE);
       String clientID = (String)headers.get(Stomp.Headers.Connect.CLIENT_ID);
       String requestID = (String)headers.get(Stomp.Headers.Connect.REQUEST_ID);
+      String acceptVersion = (String)headers.get(Stomp.Headers.Connect.ACCEPT_VERSION);
 
       HornetQSecurityManager sm = server.getSecurityManager();
       
@@ -585,11 +586,18 @@
       {
          sm.validateUser(login, passcode);
       }
+      
+      String version = negotiateVersion(acceptVersion);
 
       connection.setLogin(login);
       connection.setPasscode(passcode);
       connection.setClientID(clientID);
       connection.setValid(true);
+      if (version == null){
+         // client and server does not have any version in common. Return Error frame
+         return createNegotiationFailedFrame();
+      }
+      connection.setVersion(version);
 
       HashMap<String, Object> h = new HashMap<String, Object>();
       h.put(Stomp.Headers.Connected.SESSION, connection.getID());
@@ -597,9 +605,45 @@
       {
          h.put(Stomp.Headers.Connected.RESPONSE_ID, requestID);
       }
+      if (acceptVersion != null)
+      {
+         // Only put this in header if we got a accept-version header.
+         h.put(Stomp.Headers.Connected.VERSION, version);
+      }
       return new StompFrame(Stomp.Responses.CONNECTED, h);
    }
 
+   private String negotiateVersion(String acceptVersion)
+   {
+      if (acceptVersion != null)
+      {
+         String bestVersion = null;
+         for(String v : acceptVersion.split(","))
+         {
+            if(Stomp.Versions.V11.equals(v.trim()))
+            {
+               bestVersion = Stomp.Versions.V11;
+            } 
+            else if (Stomp.Versions.V10.equals(v.trim()) && bestVersion == null) 
+            {
+               bestVersion = Stomp.Versions.V10;
+            }
+         }
+         return bestVersion;
+      }
+      return Stomp.Versions.V10;
+   }
+
+   private StompFrame createNegotiationFailedFrame() throws Exception
+   {
+      HashMap<String, Object> h = new HashMap<String, Object>();
+      h.put(Stomp.Headers.Error.VERSION, Stomp.Versions.V10 + "," + Stomp.Versions.V11);
+      h.put(Stomp.Headers.CONTENT_TYPE, "text/plain");
+
+      StringBuffer eMess = new StringBuffer("Supported protocol versions are " + Stomp.Versions.V10 + " " + Stomp.Versions.V11);
+      return new StompFrame(Stomp.Responses.ERROR, h, eMess.toString().getBytes("UTF-8"));
+   }
+
    public void cleanup(StompConnection connection)
    {
       connection.setValid(false);

Modified: branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompUtils.java
===================================================================
--- branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompUtils.java	2011-01-25 02:55:02 UTC (rev 10141)
+++ branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompUtils.java	2011-01-25 03:17:50 UTC (rev 10142)
@@ -130,6 +130,7 @@
          headers.put(name.toString(), message.getObjectProperty(name));
       }
    }
+
    // Constructors --------------------------------------------------
 
    // Public --------------------------------------------------------

Modified: branches/stomp_1_1/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- branches/stomp_1_1/tests/src/org/hornetq/tests/integration/stomp/StompTest.java	2011-01-25 02:55:02 UTC (rev 10141)
+++ branches/stomp_1_1/tests/src/org/hornetq/tests/integration/stomp/StompTest.java	2011-01-25 03:17:50 UTC (rev 10142)
@@ -88,7 +88,56 @@
       Assert.assertTrue(f.startsWith("CONNECTED"));
       Assert.assertTrue(f.indexOf("response-id:1") >= 0);
    }
+   
+   public void testV11Connect() throws Exception
+   {
+      String connect_frame = "CONNECT\n" + "login: brianm\n" +
+                             "passcode: wombats\n" +
+                             "request-id: 1\n" +
+                             "accept-version: 1.0,1.1\n" +
+                             "\n" +
+                             Stomp.NULL;
+      sendFrame(connect_frame);
 
+      String f = receiveFrame(10000);
+      Assert.assertTrue(f.startsWith("CONNECTED"));
+      Assert.assertTrue(f.indexOf("response-id:1") >= 0);
+      Assert.assertTrue(f.indexOf("version:1.1") >= 0);
+   }
+
+   public void testConnectWithStomp() throws Exception
+   {
+      String connect_frame = "STOMP\n" + "login: brianm\n" +
+                             "passcode: wombats\n" +
+                             "request-id: 1\n" +
+                             "accept-version: 1.0,1.1\n" +
+                             "\n" +
+                             Stomp.NULL;
+      sendFrame(connect_frame);
+
+      String f = receiveFrame(10000);
+      Assert.assertTrue(f.startsWith("CONNECTED"));
+      Assert.assertTrue(f.indexOf("response-id:1") >= 0);
+      Assert.assertTrue(f.indexOf("version:1.1") >= 0);
+   }
+
+   public void testProtocolNegotiationFail() throws Exception
+   {
+      String connect_frame = "CONNECT\n" + "login: brianm\n" +
+                             "passcode: wombats\n" +
+                             "request-id: 1\n" +
+                             "accept-version: 1.2\n" +
+                             "\n" +
+                             Stomp.NULL;
+      sendFrame(connect_frame);
+
+      String f = receiveFrame(10000);
+      Assert.assertTrue(f.startsWith("ERROR"));
+      Assert.assertTrue(f.indexOf("version:1.0,1.1") >= 0);
+      Assert.assertTrue(f.indexOf("content-type:text/plain") >= 0);
+      Assert.assertTrue(f.indexOf("Supported protocol versions are") >= 0);
+   }
+
    public void testDisconnectAndError() throws Exception
    {
 



More information about the hornetq-commits mailing list