[hornetq-commits] JBoss hornetq SVN: r8812 - in branches/HORNETQ-129_STOMP_protocol: src/main/org/hornetq/integration/transports/netty and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Jan 20 09:53:17 EST 2010


Author: jmesnil
Date: 2010-01-20 09:53:17 -0500 (Wed, 20 Jan 2010)
New Revision: 8812

Added:
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompException.java
Removed:
   branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest2.java
Modified:
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrameDelimiter.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
   branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0

* added code to have a complete Stomp CONNECT + SEND + DISCONNECT use case

Added: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompException.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompException.java	                        (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompException.java	2010-01-20 14:53:17 UTC (rev 8812)
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.stomp;
+
+/**
+ * A StompException
+ *
+ * @author jmesnil
+ *
+ *
+ */
+public class StompException extends Exception
+{
+
+   /**
+    * @param string
+    */
+   public StompException(String string)
+   {
+      super(string);
+   }
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrameDelimiter.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrameDelimiter.java	2010-01-20 14:38:53 UTC (rev 8811)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrameDelimiter.java	2010-01-20 14:53:17 UTC (rev 8812)
@@ -28,6 +28,6 @@
 
    public StompFrameDelimiter()
    {
-      super(MAX_DATA_LENGTH, true, Delimiters.nulDelimiter());
+      super(MAX_DATA_LENGTH, false, Delimiters.nulDelimiter());
    }
 }

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java	2010-01-20 14:38:53 UTC (rev 8811)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java	2010-01-20 14:53:17 UTC (rev 8812)
@@ -13,6 +13,9 @@
 
 package org.hornetq.integration.transports.netty;
 
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.HashMap;
@@ -49,8 +52,10 @@
 import org.hornetq.core.server.management.NotificationService;
 import org.hornetq.integration.stomp.Stomp;
 import org.hornetq.integration.stomp.StompDestinationConverter;
+import org.hornetq.integration.stomp.StompException;
 import org.hornetq.integration.stomp.StompFrame;
 import org.hornetq.integration.stomp.StompMarshaller;
+import org.hornetq.jms.client.HornetQBytesMessage;
 import org.hornetq.jms.client.HornetQTextMessage;
 import org.hornetq.spi.core.remoting.Acceptor;
 import org.hornetq.spi.core.remoting.BufferHandler;
@@ -565,15 +570,16 @@
 
       public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
       {
+         StompFrame frame = (StompFrame)e.getMessage();
+         System.out.println(">>> got frame " + frame);
+
+         // need to interact with HornetQ server & session
+         HornetQServer server = serverHandler.getServer();
+         RemotingConnection connection = serverHandler.getRemotingConnection(e.getChannel().getId());
+
          try
          {
-            StompFrame frame = (StompFrame)e.getMessage();
-            System.out.println(">>> got frame " + frame);
 
-            // need to interact with HornetQ server & session
-            HornetQServer server = serverHandler.getServer();
-            RemotingConnection connection = serverHandler.getRemotingConnection(e.getChannel().getId());
-
             String command = frame.getCommand();
 
             StompFrame response = null;
@@ -581,6 +587,10 @@
             {
                response = onConnect(frame, server, connection);
             }
+            if (Stomp.Commands.DISCONNECT.equals(command))
+            {
+               response = onDisconnect(frame, server, connection);
+            }
             else if (Stomp.Commands.SEND.equals(command))
             {
                response = onSend(frame, server, connection);
@@ -598,14 +608,68 @@
                connection.getTransportConnection().write(buffer, true);
             }
          }
+         catch (StompException ex)
+         {
+            // Let the stomp client know about any protocol errors.
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
+            ex.printStackTrace(stream);
+            stream.append(Stomp.NULL + Stomp.NEWLINE);
+            stream.close();
+
+            Map<String, Object> headers = new HashMap<String, Object>();
+            headers.put(Stomp.Headers.Error.MESSAGE, e.getMessage());
+
+            final String receiptId = (String) frame.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
+            if (receiptId != null) {
+                headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
+            }
+
+            StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
+            byte[] bytes = marshaller.marshal(errorMessage);
+            HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+            System.out.println("ready to send reply: " + buffer);
+            connection.getTransportConnection().write(buffer, true);
+
+         }
          catch (Exception ex)
          {
             ex.printStackTrace();
          }
       }
 
-      private StompFrame onSend(StompFrame frame, HornetQServer server, RemotingConnection connection) throws HornetQException
+      private void checkConnected(RemotingConnection connection) throws StompException
       {
+         ServerSession session = sessions.get(connection);
+         if (session == null)
+         {
+            throw new StompException("Not connected");
+         }
+      }
+      private StompFrame onDisconnect(StompFrame frame, HornetQServer server, RemotingConnection connection) throws StompException
+      {
+         checkConnected(connection);
+         
+         ServerSession session = sessions.get(connection);
+         if (session != null)
+         {
+            try
+            {
+               session.close();
+            }
+            catch (Exception e)
+            {
+               throw new StompException(e.getMessage());
+            }
+            sessions.remove(connection);
+         }
+         return null;
+      }
+
+      private StompFrame onSend(StompFrame frame, HornetQServer server, RemotingConnection connection) throws HornetQException, StompException
+      {
+         checkConnected(connection);
+         
          Map<String, Object> headers = frame.getHeaders();
          String queue = (String)headers.get(Stomp.Headers.Send.DESTINATION);
          /*
@@ -615,6 +679,10 @@
          boolean durable = (Boolean)headers.get(Stomp.Headers.Send.PERSISTENT);
          */
          byte type = HornetQTextMessage.TYPE;
+         if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH))
+         {
+            type = HornetQBytesMessage.TYPE;
+         }
          long timestamp = System.currentTimeMillis();
          boolean durable = false;
          long expiration = -1;
@@ -625,9 +693,15 @@
          message.setType(type);
          message.setTimestamp(timestamp);
          message.setAddress(address);
-         String content = new String(frame.getContent());
-         System.out.println(">>> got: " + content);
-         message.getBodyBuffer().writeNullableSimpleString(SimpleString.toSimpleString(content));
+         byte[] content = frame.getContent();
+         if (type == HornetQTextMessage.TYPE)
+         {
+            message.getBodyBuffer().writeNullableSimpleString(SimpleString.toSimpleString(new String(content)));
+         }
+         else
+         {
+            message.getBodyBuffer().writeBytes(content);
+         }
 
          ServerSession session = sessions.get(connection);
          SessionSendMessage packet = new SessionSendMessage(message, false);

Modified: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java	2010-01-20 14:38:53 UTC (rev 8811)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java	2010-01-20 14:53:17 UTC (rev 8812)
@@ -79,7 +79,32 @@
         Assert.assertTrue(f.startsWith("CONNECTED"));
         Assert.assertTrue(f.indexOf("response-id:1") >= 0);
     }
+    
+    public void testDisconnectAndError() throws Exception {
 
+       String connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "request-id: 1\n" + "\n" + Stomp.NULL;
+       sendFrame(connect_frame);
+
+       String f = receiveFrame(10000);
+       Assert.assertTrue(f.startsWith("CONNECTED"));
+       Assert.assertTrue(f.indexOf("response-id:1") >= 0);
+       
+       connect_frame = "DISCONNECT\n\n" + Stomp.NULL;
+       sendFrame(connect_frame);
+       
+       // sending a message will result in an error
+       String frame =
+          "SEND\n" +
+                  "destination:/queue/" + getQueueName() + "\n\n" +
+                  "Hello World" +
+                  Stomp.NULL;
+       sendFrame(frame);
+
+       f = receiveFrame(10000);
+       Assert.assertTrue(f.startsWith("ERROR"));
+   }
+
+
     public void testSendMessage() throws Exception {
 
         MessageConsumer consumer = session.createConsumer(queue);
@@ -150,7 +175,47 @@
        long tmsg = message.getJMSTimestamp();
        Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
    }
+    
+    public void testSendMessageWithContentLength() throws Exception {
 
+       MessageConsumer consumer = session.createConsumer(queue);
+
+       String frame =
+               "CONNECT\n" +
+                       "login: brianm\n" +
+                       "passcode: wombats\n\n" +
+                       Stomp.NULL;
+       sendFrame(frame);
+
+       frame = receiveFrame(10000);
+       Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+       byte[] data = new byte[] {1, 2, 3, 4};
+        
+       frame =
+               "SEND\n" +
+                       "destination:/queue/" + getQueueName() + "\n" +
+                       "content-length:" + data.length + "\n\n" +
+                       new String(data) +
+                       Stomp.NULL;
+
+       sendFrame(frame);
+       
+       BytesMessage message = (BytesMessage) consumer.receive(1000);
+       Assert.assertNotNull(message);
+       assertEquals(data.length, message.getBodyLength());
+       assertEquals(data[0], message.readByte());
+       assertEquals(data[1], message.readByte());
+       assertEquals(data[2], message.readByte());
+       assertEquals(data[3], message.readByte());
+
+       // Make sure that the timestamp is valid - should
+       // be very close to the current time.
+       long tnow = System.currentTimeMillis();
+       long tmsg = message.getJMSTimestamp();
+       Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+   }
+
     public void _testJMSXGroupIdCanBeSet() throws Exception {
 
         MessageConsumer consumer = session.createConsumer(queue);

Deleted: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest2.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest2.java	2010-01-20 14:38:53 UTC (rev 8811)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest2.java	2010-01-20 14:53:17 UTC (rev 8812)
@@ -1,90 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.integration.stomp;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.HornetQServers;
-import org.hornetq.integration.transports.netty.NettyAcceptor;
-import org.hornetq.integration.transports.netty.NettyAcceptorFactory;
-import org.hornetq.integration.transports.netty.TransportConstants;
-
-import junit.framework.TestCase;
-
-/**
- * A StompTest
- *
- * @author jmesnil
- *
- *
- */
-public class StompTest2 extends TestCase
-{
-
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   private HornetQServer server;
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-   public void testFoo() throws Exception
-   {
-      Thread.sleep(10);
-   }
-
-   // Package protected ---------------------------------------------
-
-   @Override
-   protected void setUp() throws Exception
-   {
-      super.setUp();
-
-      Configuration config = new ConfigurationImpl();
-      config.setSecurityEnabled(false);
-
-      Map<String, Object> params = new HashMap<String, Object>();
-      params.put(TransportConstants.PROTOCOL_PROP_NAME, TransportConstants.STOMP_PROTOCOL);
-      params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT);
-      TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
-      config.getAcceptorConfigurations().add(stompTransport);
-
-      server = HornetQServers.newHornetQServer(config);
-      server.start();
-   }
-
-   @Override
-   protected void tearDown() throws Exception
-   {
-      server.stop();
-
-      super.tearDown();
-   }
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-
-}



More information about the hornetq-commits mailing list