[hornetq-commits] JBoss hornetq SVN: r8810 - in branches/HORNETQ-129_STOMP_protocol: src/main/org/hornetq/core/server and 4 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Jan 20 08:24:56 EST 2010


Author: jmesnil
Date: 2010-01-20 08:24:55 -0500 (Wed, 20 Jan 2010)
New Revision: 8810

Added:
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompDestinationConverter.java
Modified:
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/message/impl/MessageImpl.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/ServerMessage.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
   branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
   branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0

* barely enough ugly code to make StompTest.testSendMessage pass this time

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/message/impl/MessageImpl.java	2010-01-20 11:02:05 UTC (rev 8809)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/message/impl/MessageImpl.java	2010-01-20 13:24:55 UTC (rev 8810)
@@ -249,7 +249,14 @@
    {
       return type;
    }
+   
 
+   public void setType(byte type)
+   {
+      this.type = type;
+   }
+
+
    public boolean isDurable()
    {
       return durable;

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/ServerMessage.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/ServerMessage.java	2010-01-20 11:02:05 UTC (rev 8809)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/ServerMessage.java	2010-01-20 13:24:55 UTC (rev 8810)
@@ -62,4 +62,6 @@
    boolean storeIsPaging();
 
    void encodeMessageIDToBuffer();
+
+   void setType(byte type);
 }

Added: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompDestinationConverter.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompDestinationConverter.java	                        (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompDestinationConverter.java	2010-01-20 13:24:55 UTC (rev 8810)
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.stomp;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.jms.client.HornetQQueue;
+import org.hornetq.jms.client.HornetQTemporaryQueue;
+import org.hornetq.jms.client.HornetQTemporaryTopic;
+import org.hornetq.jms.client.HornetQTopic;
+
+/**
+ * A StompDestinationConverter
+ *
+ * @author jmesnil
+ *
+ *
+ */
+public class StompDestinationConverter
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   public static SimpleString convertDestination(String name) throws HornetQException
+   {
+      if (name == null)
+      {
+         throw new HornetQException(HornetQException.ILLEGAL_STATE, "No destination is specified!");
+      }
+      else if (name.startsWith("/queue/"))
+      {
+         String queueName = name.substring("/queue/".length(), name.length());
+         return HornetQQueue.createAddressFromName(queueName);
+      }
+      else if (name.startsWith("/topic/"))
+      {
+         String topicName = name.substring("/topic/".length(), name.length());
+         return HornetQTopic.createAddressFromName(topicName);
+      }
+      else if (name.startsWith("/temp-queue/"))
+      {
+         String tempName = name.substring("/temp-queue/".length(), name.length());
+         return HornetQTemporaryQueue.createAddressFromName(tempName);
+      }
+      else if (name.startsWith("/temp-topic/"))
+      {
+         String tempName = name.substring("/temp-topic/".length(), name.length());
+         return HornetQTemporaryTopic.createAddressFromName(tempName);
+      }
+      else
+      {
+         throw new HornetQException(HornetQException.ILLEGAL_STATE, "Illegal destination name: [" + name +
+                                                                    "] -- StompConnect destinations " +
+                                                                    "must begine with one of: /queue/ /topic/ /temp-queue/ /temp-topic/");
+      }
+   }
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java	2010-01-20 11:02:05 UTC (rev 8809)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java	2010-01-20 13:24:55 UTC (rev 8810)
@@ -40,13 +40,18 @@
 import org.hornetq.core.remoting.RemotingConnection;
 import org.hornetq.core.remoting.impl.CorePacketDecoder;
 import org.hornetq.core.remoting.impl.ssl.SSLSupport;
+import org.hornetq.core.remoting.impl.wireformat.SessionSendMessage;
 import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.impl.ServerMessageImpl;
 import org.hornetq.core.server.management.Notification;
 import org.hornetq.core.server.management.NotificationService;
 import org.hornetq.integration.stomp.Stomp;
+import org.hornetq.integration.stomp.StompDestinationConverter;
 import org.hornetq.integration.stomp.StompFrame;
 import org.hornetq.integration.stomp.StompMarshaller;
+import org.hornetq.jms.client.HornetQTextMessage;
 import org.hornetq.spi.core.remoting.Acceptor;
 import org.hornetq.spi.core.remoting.BufferHandler;
 import org.hornetq.spi.core.remoting.Connection;
@@ -160,7 +165,7 @@
       this.handler = handler;
 
       this.serverHandler = serverHandler;
-      
+
       this.listener = listener;
 
       sslEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME,
@@ -308,7 +313,10 @@
             if (protocol.equals(TransportConstants.STOMP_PROTOCOL))
             {
                ChannelPipelineSupport.addStompStack(pipeline, serverHandler);
-               pipeline.addLast("handler", new StompChannelHandler(serverHandler, new StompMarshaller(), channelGroup, new Listener()));
+               pipeline.addLast("handler", new StompChannelHandler(serverHandler,
+                                                                   new StompMarshaller(),
+                                                                   channelGroup,
+                                                                   new Listener()));
             }
             else
             {
@@ -512,19 +520,20 @@
    private final class HornetQServerChannelHandler extends AbstractServerChannelHandler
    {
       private PacketDecoder decoder;
+
       private BufferHandler handler;
 
       HornetQServerChannelHandler(final ChannelGroup group,
-                                   final PacketDecoder decoder,
-                                   final BufferHandler handler,
-                                   final ConnectionLifeCycleListener listener)
-       {
-          super(group, listener);
-          
-          this.decoder = decoder;
-          this.handler = handler;
-       }
+                                  final PacketDecoder decoder,
+                                  final BufferHandler handler,
+                                  final ConnectionLifeCycleListener listener)
+      {
+         super(group, listener);
 
+         this.decoder = decoder;
+         this.handler = handler;
+      }
+
       @Override
       public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
       {
@@ -532,14 +541,16 @@
 
          handler.bufferReceived(e.getChannel().getId(), new ChannelBufferWrapper(buffer), decoder);
       }
-      
+
    }
-   
+
    @ChannelPipelineCoverage("one")
    public final class StompChannelHandler extends AbstractServerChannelHandler
    {
       private final StompMarshaller marshaller;
 
+      private final Map<RemotingConnection, ServerSession> sessions = new HashMap<RemotingConnection, ServerSession>();
+
       private ServerHolder serverHandler;
 
       public StompChannelHandler(ServerHolder serverHolder,
@@ -554,31 +565,82 @@
 
       public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
       {
-         StompFrame frame = (StompFrame)e.getMessage();
-         System.out.println(">>> got frame " + frame);
+         try
+         {
+            StompFrame frame = (StompFrame)e.getMessage();
+            System.out.println(">>> got frame " + frame);
 
-         // need to interact with HornetQ server & session
-         HornetQServer server = serverHandler.getServer();
-         RemotingConnection connection = serverHandler.getRemotingConnection(e.getChannel().getId());
+            // need to interact with HornetQ server & session
+            HornetQServer server = serverHandler.getServer();
+            RemotingConnection connection = serverHandler.getRemotingConnection(e.getChannel().getId());
 
-         String command = frame.getCommand();
+            String command = frame.getCommand();
 
-         StompFrame response = null;
-         if (Stomp.Commands.CONNECT.equals(command))
+            StompFrame response = null;
+            if (Stomp.Commands.CONNECT.equals(command))
+            {
+               response = onConnect(frame, server, connection);
+            }
+            else if (Stomp.Commands.SEND.equals(command))
+            {
+               response = onSend(frame, server, connection);
+            }
+            else
+            {
+               log.error("Unsupported Stomp frame: " + frame);
+            }
+            if (response != null)
+            {
+               System.out.println(">>> will reply " + response);
+               byte[] bytes = marshaller.marshal(response);
+               HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+               System.out.println("ready to send reply: " + buffer);
+               connection.getTransportConnection().write(buffer, true);
+            }
+         }
+         catch (Exception ex)
          {
-            response = onConnect(frame, server, connection);
+            ex.printStackTrace();
          }
-         else
+      }
+
+      private StompFrame onSend(StompFrame frame, HornetQServer server, RemotingConnection connection) throws HornetQException
+      {
+         Map<String, Object> headers = frame.getHeaders();
+         String queue = (String)headers.get(Stomp.Headers.Send.DESTINATION);
+         /*
+         String type = (String)headers.get(Stomp.Headers.Send.TYPE);
+         long expiration = (Long)headers.get(Stomp.Headers.Send.EXPIRATION_TIME);
+         byte priority = (Byte)headers.get(Stomp.Headers.Send.PRIORITY);
+         boolean durable = (Boolean)headers.get(Stomp.Headers.Send.PERSISTENT);
+         */
+         byte type = HornetQTextMessage.TYPE;
+         long timestamp = System.currentTimeMillis();
+         boolean durable = false;
+         long expiration = -1;
+         byte priority = 9;
+         SimpleString address = StompDestinationConverter.convertDestination(queue);
+
+         ServerMessage message = new ServerMessageImpl(server.getStorageManager().generateUniqueID(), 512);
+         message.setType(type);
+         message.setTimestamp(timestamp);
+         message.setAddress(address);
+         String content = new String(frame.getContent());
+         System.out.println(">>> got: " + content);
+         message.getBodyBuffer().writeNullableSimpleString(SimpleString.toSimpleString(content));
+
+         ServerSession session = sessions.get(connection);
+         SessionSendMessage packet = new SessionSendMessage(message, false);
+         session.handleSend(packet);
+         if (headers.containsKey(Stomp.Headers.RECEIPT_REQUESTED))
          {
-            log.error("Unsupported Stomp frame: " + frame);
+            Map<String, Object> h = new HashMap<String, Object>();
+            h.put(Stomp.Headers.Response.RECEIPT_ID, headers.get(Stomp.Headers.RECEIPT_REQUESTED));
+            return new StompFrame(Stomp.Responses.RECEIPT, h, new byte[] {});
          }
-         if (response != null)
+         else
          {
-            System.out.println(">>> will reply " + response);
-            byte[] bytes = marshaller.marshal(response);
-            HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
-            System.out.println("ready to send reply: " + buffer);
-            connection.getTransportConnection().write(buffer, true);
+            return null;
          }
       }
 
@@ -603,6 +665,7 @@
                               false,
                               -1);
          ServerSession session = server.getSession(name);
+         sessions.put(connection, session);
          System.out.println(">>> created session " + session);
          HashMap<String, Object> h = new HashMap<String, Object>();
          h.put(Stomp.Headers.Connected.SESSION, name);
@@ -610,12 +673,11 @@
          return new StompFrame(Stomp.Responses.CONNECTED, h, new byte[] {});
       }
    }
-   
+
    @ChannelPipelineCoverage("one")
    public abstract class AbstractServerChannelHandler extends HornetQChannelHandler
    {
-      protected AbstractServerChannelHandler(final ChannelGroup group,
-                                  final ConnectionLifeCycleListener listener)
+      protected AbstractServerChannelHandler(final ChannelGroup group, final ConnectionLifeCycleListener listener)
       {
          super(group, listener);
       }

Modified: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java	2010-01-20 11:02:05 UTC (rev 8809)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java	2010-01-20 13:24:55 UTC (rev 8810)
@@ -41,11 +41,10 @@
 import junit.framework.Assert;
 import junit.framework.TestCase;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.logging.Logger;
 import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
 import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
 import org.hornetq.core.server.HornetQServer;
@@ -61,7 +60,7 @@
 import org.hornetq.jms.server.impl.JMSServerManagerImpl;
 
 public class StompTest extends TestCase {
-    private static final transient Log log = LogFactory.getLog(StompTest.class);
+    private static final transient Logger log = Logger.getLogger(StompTest.class);
     private int port = 61613;
     private Socket stompSocket;
     private ByteArrayOutputStream inputBuffer;
@@ -102,7 +101,7 @@
                         Stomp.NULL;
 
         sendFrame(frame);
-
+        
         TextMessage message = (TextMessage) consumer.receive(1000);
         Assert.assertNotNull(message);
         Assert.assertEquals("Hello World", message.getText());
@@ -113,7 +112,45 @@
         long tmsg = message.getJMSTimestamp();
         Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
     }
+    
+    public void testSendMessageWithReceipt() throws Exception {
 
+       MessageConsumer consumer = session.createConsumer(queue);
+
+       String frame =
+               "CONNECT\n" +
+                       "login: brianm\n" +
+                       "passcode: wombats\n\n" +
+                       Stomp.NULL;
+       sendFrame(frame);
+
+       frame = receiveFrame(10000);
+       Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+       frame =
+               "SEND\n" +
+                       "destination:/queue/" + getQueueName() + "\n" +
+                       "receipt: 1234\n\n" +
+                       "Hello World" +
+                       Stomp.NULL;
+
+       sendFrame(frame);
+
+       String f = receiveFrame(10000);
+       Assert.assertTrue(f.startsWith("RECEIPT"));
+       Assert.assertTrue(f.indexOf("receipt-id:1234") >= 0);
+
+       TextMessage message = (TextMessage) consumer.receive(1000);
+       Assert.assertNotNull(message);
+       Assert.assertEquals("Hello World", message.getText());
+
+       // Make sure that the timestamp is valid - should
+       // be very close to the current time.
+       long tnow = System.currentTimeMillis();
+       long tmsg = message.getJMSTimestamp();
+       Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+   }
+
     public void _testJMSXGroupIdCanBeSet() throws Exception {
 
         MessageConsumer consumer = session.createConsumer(queue);

Modified: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java	2010-01-20 11:02:05 UTC (rev 8809)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java	2010-01-20 13:24:55 UTC (rev 8810)
@@ -663,6 +663,15 @@
          // TODO Auto-generated method stub
          return 0;
       }
+      
+      /* (non-Javadoc)
+       * @see org.hornetq.core.server.ServerMessage#setType(byte)
+       */
+      public void setType(byte type)
+      {
+         // TODO Auto-generated method stub
+         
+      }
 
       public HornetQBuffer getWholeBuffer()
       {



More information about the hornetq-commits mailing list