[hornetq-commits] JBoss hornetq SVN: r8828 - in branches/HORNETQ-129_STOMP_protocol: src/main/org/hornetq/integration/transports/netty and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Jan 21 10:40:24 EST 2010


Author: jmesnil
Date: 2010-01-21 10:40:24 -0500 (Thu, 21 Jan 2010)
New Revision: 8828

Modified:
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompDestinationConverter.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
   branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0

* pass testSubscribeWithAutoAck

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompDestinationConverter.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompDestinationConverter.java	2010-01-21 14:46:21 UTC (rev 8827)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompDestinationConverter.java	2010-01-21 15:40:24 UTC (rev 8828)
@@ -70,6 +70,35 @@
       }
    }
 
+   public static String toStomp(String address) throws HornetQException
+   {
+      if (address == null)
+      {
+         throw new HornetQException(HornetQException.ILLEGAL_STATE, "No destination is specified!");
+      }
+      else if (address.startsWith(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX))
+      {
+         return "/queue/" + address.substring(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX.length(), address.length());
+      }
+      else if (address.startsWith(HornetQTemporaryQueue.JMS_TEMP_QUEUE_ADDRESS_PREFIX))
+      {
+         return "/temp-queue/" + address.substring(HornetQTemporaryQueue.JMS_TEMP_QUEUE_ADDRESS_PREFIX.length(), address.length());
+      }
+      else if (address.startsWith(HornetQTopic.JMS_TOPIC_ADDRESS_PREFIX))
+      {
+         return "/topic/" + address.substring(HornetQTopic.JMS_TOPIC_ADDRESS_PREFIX.length(), address.length());
+      }
+      else if (address.startsWith(HornetQTemporaryTopic.JMS_TEMP_TOPIC_ADDRESS_PREFIX))
+      {
+         return "/temp-topic/" + address.substring(HornetQTemporaryTopic.JMS_TEMP_TOPIC_ADDRESS_PREFIX.length(), address.length());
+      }
+      else
+      {
+         throw new HornetQException(HornetQException.ILLEGAL_STATE, "Illegal address name: [" + address +
+                                                                    "] -- Acceptable address must comply to JMS semantics");
+      }
+   }
+
    // Constructors --------------------------------------------------
 
    // Public --------------------------------------------------------

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java	2010-01-21 14:46:21 UTC (rev 8827)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java	2010-01-21 15:40:24 UTC (rev 8828)
@@ -46,6 +46,7 @@
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.SessionCallback;
 import org.hornetq.core.server.impl.ServerMessageImpl;
 import org.hornetq.core.server.management.Notification;
 import org.hornetq.core.server.management.NotificationService;
@@ -325,9 +326,7 @@
             else
             {
                ChannelPipelineSupport.addHornetQCodecFilter(pipeline, handler);
-               pipeline.addLast("handler", new HornetQServerChannelHandler(channelGroup,
-                                                                           handler,
-                                                                           new Listener()));
+               pipeline.addLast("handler", new HornetQServerChannelHandler(channelGroup, handler, new Listener()));
             }
 
             return pipeline;
@@ -595,9 +594,11 @@
             else
             {
                log.error("Unsupported Stomp frame: " + frame);
-               response = new StompFrame(Stomp.Responses.ERROR, new HashMap<String, Object>(), ("Unsupported frame: " + command).getBytes());
+               response = new StompFrame(Stomp.Responses.ERROR,
+                                         new HashMap<String, Object>(),
+                                         ("Unsupported frame: " + command).getBytes());
             }
-            
+
             if (response != null)
             {
                System.out.println(">>> will reply " + response);
@@ -613,15 +614,15 @@
             ByteArrayOutputStream baos = new ByteArrayOutputStream();
             PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
             ex.printStackTrace(stream);
-            stream.append(Stomp.NULL + Stomp.NEWLINE);
             stream.close();
 
             Map<String, Object> headers = new HashMap<String, Object>();
             headers.put(Stomp.Headers.Error.MESSAGE, e.getMessage());
 
-            final String receiptId = (String) frame.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
-            if (receiptId != null) {
-                headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
+            final String receiptId = (String)frame.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
+            if (receiptId != null)
+            {
+               headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
             }
 
             StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
@@ -645,7 +646,9 @@
        * @throws StompException 
        * @throws HornetQException 
        */
-      private StompFrame onSubscribe(StompFrame frame, HornetQServer server, RemotingConnection connection) throws Exception, StompException, HornetQException
+      private StompFrame onSubscribe(StompFrame frame, HornetQServer server, RemotingConnection connection) throws Exception,
+                                                                                                           StompException,
+                                                                                                           HornetQException
       {
          Map<String, Object> headers = frame.getHeaders();
          String queue = (String)headers.get(Stomp.Headers.Send.DESTINATION);
@@ -669,7 +672,7 @@
          }
          return session;
       }
-      
+
       private StompFrame onDisconnect(StompFrame frame, HornetQServer server, RemotingConnection connection) throws StompException
       {
          ServerSession session = checkAndGetSession(connection);
@@ -691,7 +694,7 @@
       private StompFrame onSend(StompFrame frame, HornetQServer server, RemotingConnection connection) throws Exception
       {
          ServerSession session = checkAndGetSession(connection);
-         
+
          Map<String, Object> headers = frame.getHeaders();
          String queue = (String)headers.get(Stomp.Headers.Send.DESTINATION);
          /*
@@ -738,7 +741,7 @@
          }
       }
 
-      private StompFrame onConnect(StompFrame frame, HornetQServer server, CoreRemotingConnection connection) throws Exception
+      private StompFrame onConnect(StompFrame frame, HornetQServer server, final CoreRemotingConnection connection) throws Exception
       {
          Map<String, Object> headers = frame.getHeaders();
          String login = (String)headers.get(Stomp.Headers.Connect.LOGIN);
@@ -756,6 +759,62 @@
                               false,
                               false);
          ServerSession session = server.getSession(name);
+         session.setCallback(new SessionCallback()
+         {
+            public void sendProducerCreditsMessage(int credits, SimpleString address, int offset)
+            {
+            }
+
+            public int sendMessage(ServerMessage serverMessage, long consumerID, int deliveryCount)
+            {
+               try
+               {
+                  Map<String, Object> headers = new HashMap<String, Object>();
+                  headers.put(Stomp.Headers.Message.DESTINATION,
+                              StompDestinationConverter.toStomp(serverMessage.getAddress().toString()));
+                  byte[] data = new byte[] {};
+                  if (serverMessage.getType() == HornetQTextMessage.TYPE)
+                  {
+                     SimpleString text = serverMessage.getBodyBuffer().readNullableSimpleString();
+                     if (text != null)
+                     {
+                        data = text.toString().getBytes();
+                     }
+                  }
+                  StompFrame msg = new StompFrame(Stomp.Responses.MESSAGE, headers, data);
+                  System.out.println("SENDING : " + msg);
+                  byte[] bytes = marshaller.marshal(msg);
+                  HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+                  connection.getTransportConnection().write(buffer, true);
+
+                  return bytes.length;
+
+               }
+               catch (Exception e)
+               {
+                  e.printStackTrace();
+                  return 0;
+               }
+
+            }
+
+            public int sendLargeMessageContinuation(long consumerID,
+                                                    byte[] body,
+                                                    boolean continues,
+                                                    boolean requiresResponse)
+            {
+               return 0;
+            }
+
+            public int sendLargeMessage(long consumerID, byte[] headerBuffer, long bodySize, int deliveryCount)
+            {
+               return 0;
+            }
+
+            public void closed()
+            {
+            }
+         });
          sessions.put(connection, session);
          System.out.println(">>> created session " + session);
          HashMap<String, Object> h = new HashMap<String, Object>();

Modified: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java	2010-01-21 14:46:21 UTC (rev 8827)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java	2010-01-21 15:40:24 UTC (rev 8828)
@@ -341,6 +341,8 @@
 
         frame = receiveFrame(10000);
         Assert.assertTrue(frame.startsWith("MESSAGE"));
+        Assert.assertTrue(frame.indexOf("destination:") > 0);
+        Assert.assertTrue(frame.indexOf(getName()) > 0);
 
         frame =
                 "DISCONNECT\n" +
@@ -408,7 +410,7 @@
 
         MessageProducer producer = session.createProducer(queue);
         TextMessage message = session.createTextMessage("Hello World");
-        message.setStringProperty("s", "value");
+        message.setStringProperty("S", "value");
         message.setBooleanProperty("n", false);
         message.setByteProperty("byte", (byte) 9);
         message.setDoubleProperty("d", 2.0);
@@ -420,6 +422,15 @@
 
         frame = receiveFrame(10000);
         Assert.assertTrue(frame.startsWith("MESSAGE"));
+        Assert.assertTrue(frame.indexOf("S:") > 0);
+        Assert.assertTrue(frame.indexOf("n:") > 0);
+        Assert.assertTrue(frame.indexOf("byte:") > 0);
+        Assert.assertTrue(frame.indexOf("d:") > 0);
+        Assert.assertTrue(frame.indexOf("f:") > 0);
+        Assert.assertTrue(frame.indexOf("i:") > 0);
+        Assert.assertTrue(frame.indexOf("l:") > 0);
+        Assert.assertTrue(frame.indexOf("s:") > 0);
+        Assert.assertTrue(frame.indexOf("Hello World") > 0);
 
 //        System.out.println("out: "+frame);
 
@@ -908,6 +919,11 @@
             }
             else if (c == 0) {
                 c = is.read();
+                if (c != '\n')
+                {
+                   byte[] ba = inputBuffer.toByteArray();
+                   System.out.println(new String(ba, "UTF-8"));
+                }
                 Assert.assertEquals("Expecting stomp frame to terminate with \0\n", c, '\n');
                 byte[] ba = inputBuffer.toByteArray();
                 inputBuffer.reset();



More information about the hornetq-commits mailing list