[hornetq-commits] JBoss hornetq SVN: r8817 - in branches/HORNETQ-129_STOMP_protocol: src/main/org/hornetq/integration/transports/netty and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Jan 20 12:11:18 EST 2010


Author: jmesnil
Date: 2010-01-20 12:11:18 -0500 (Wed, 20 Jan 2010)
New Revision: 8817

Modified:
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/Stomp.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
   branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0

* added SUBSCRIBE command

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/Stomp.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/Stomp.java	2010-01-20 17:07:39 UTC (rev 8816)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/Stomp.java	2010-01-20 17:11:18 UTC (rev 8817)
@@ -31,8 +31,8 @@
         String CONNECT = "CONNECT";
         String SEND = "SEND";
         String DISCONNECT = "DISCONNECT";
-        String SUBSCRIBE = "SUB";
-        String UNSUBSCRIBE = "UNSUB";
+        String SUBSCRIBE = "SUBSCRIBE";
+        String UNSUBSCRIBE = "UNSUBSCRIBE";
         String BEGIN_TRANSACTION = "BEGIN";
         String COMMIT_TRANSACTION = "COMMIT";
         String ABORT_TRANSACTION = "ABORT";

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java	2010-01-20 17:07:39 UTC (rev 8816)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java	2010-01-20 17:11:18 UTC (rev 8817)
@@ -43,6 +43,9 @@
 import org.hornetq.core.remoting.RemotingConnection;
 import org.hornetq.core.remoting.impl.CorePacketDecoder;
 import org.hornetq.core.remoting.impl.ssl.SSLSupport;
+import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
+import org.hornetq.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionSendMessage;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.ServerMessage;
@@ -587,7 +590,7 @@
             {
                response = onConnect(frame, server, connection);
             }
-            if (Stomp.Commands.DISCONNECT.equals(command))
+            else if (Stomp.Commands.DISCONNECT.equals(command))
             {
                response = onDisconnect(frame, server, connection);
             }
@@ -595,10 +598,16 @@
             {
                response = onSend(frame, server, connection);
             }
+            else if (Stomp.Commands.SUBSCRIBE.equals(command))
+            {
+               response = onSubscribe(frame, server, connection);
+            }
             else
             {
                log.error("Unsupported Stomp frame: " + frame);
+               response = new StompFrame(Stomp.Responses.ERROR, new HashMap<String, Object>(), ("Unsupported frame: " + command).getBytes());
             }
+            
             if (response != null)
             {
                System.out.println(">>> will reply " + response);
@@ -638,19 +647,44 @@
          }
       }
 
-      private void checkConnected(RemotingConnection connection) throws StompException
+      /**
+       * @param frame
+       * @param server
+       * @param connection
+       * @return
+       * @throws StompException 
+       * @throws HornetQException 
+       */
+      private StompFrame onSubscribe(StompFrame frame, HornetQServer server, RemotingConnection connection) throws StompException, HornetQException
       {
+         Map<String, Object> headers = frame.getHeaders();
+         String queue = (String)headers.get(Stomp.Headers.Send.DESTINATION);
+         SimpleString queueName = StompDestinationConverter.convertDestination(queue);
+
+         ServerSession session = checkAndGetSession(connection);
+         long id = server.getStorageManager().generateUniqueID();
+         SessionCreateConsumerMessage packet = new SessionCreateConsumerMessage(id , queueName, null, false, false);
+         session.handleCreateConsumer(packet);
+         SessionConsumerFlowCreditMessage credits = new SessionConsumerFlowCreditMessage(id, -1);
+         session.handleReceiveConsumerCredits(credits );
+         session.handleStart(new PacketImpl(PacketImpl.SESS_START));
+
+         return null;
+      }
+
+      private ServerSession checkAndGetSession(RemotingConnection connection) throws StompException
+      {
          ServerSession session = sessions.get(connection);
          if (session == null)
          {
             throw new StompException("Not connected");
          }
+         return session;
       }
+      
       private StompFrame onDisconnect(StompFrame frame, HornetQServer server, RemotingConnection connection) throws StompException
       {
-         checkConnected(connection);
-         
-         ServerSession session = sessions.get(connection);
+         ServerSession session = checkAndGetSession(connection);
          if (session != null)
          {
             try
@@ -668,7 +702,7 @@
 
       private StompFrame onSend(StompFrame frame, HornetQServer server, RemotingConnection connection) throws HornetQException, StompException
       {
-         checkConnected(connection);
+         ServerSession session = checkAndGetSession(connection);
          
          Map<String, Object> headers = frame.getHeaders();
          String queue = (String)headers.get(Stomp.Headers.Send.DESTINATION);
@@ -703,7 +737,6 @@
             message.getBodyBuffer().writeBytes(content);
          }
 
-         ServerSession session = sessions.get(connection);
          SessionSendMessage packet = new SessionSendMessage(message, false);
          session.handleSend(packet);
          if (headers.containsKey(Stomp.Headers.RECEIPT_REQUESTED))

Modified: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java	2010-01-20 17:07:39 UTC (rev 8816)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java	2010-01-20 17:11:18 UTC (rev 8817)
@@ -728,6 +728,10 @@
                         Stomp.NULL;
         sendFrame(frame);
 
+        waitForFrameToTakeEffect();
+        // check the message is not committed
+        assertNull(consumer.receive(100));
+        
         frame =
                 "COMMIT\n" +
                         "transaction: tx1\n" +
@@ -899,7 +903,6 @@
         int c = 0;
         for (; ;) {
             c = is.read();
-            System.out.println(c);
             if (c < 0) {
                 throw new IOException("socket closed.");
             }



More information about the hornetq-commits mailing list