[hornetq-commits] JBoss hornetq SVN: r11446 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Sep 30 00:16:23 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-09-30 00:16:23 -0400 (Fri, 30 Sep 2011)
New Revision: 11446

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompSession.java
Log:
Isolating Stomp Messages on topics

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompSession.java	2011-09-29 22:03:44 UTC (rev 11445)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompSession.java	2011-09-30 04:16:23 UTC (rev 11446)
@@ -12,6 +12,7 @@
  */
 package org.hornetq.core.protocol.stomp;
 
+import java.io.UnsupportedEncodingException;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -94,14 +95,56 @@
       {
          StompSubscription subscription = subscriptions.get(consumerID);
 
+         StompFrame frame = createFrame(serverMessage, deliveryCount, subscription);
+
+         int length = frame.getEncodedSize();
+
+         if (subscription.isAutoACK())
+         {
+            session.acknowledge(consumerID, serverMessage.getMessageID());
+            session.commit();
+         }
+         else
+         {
+            messagesToAck.put(serverMessage.getMessageID(), new Pair<Long, Integer>(consumerID, length));
+         }
+
+         // Must send AFTER adding to messagesToAck - or could get acked from client BEFORE it's been added!
+         manager.send(connection, frame);
+
+         return length;
+
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+         return 0;
+      }
+
+   }
+
+   /**
+    * @param serverMessage
+    * @param deliveryCount
+    * @param subscription
+    * @return
+    * @throws UnsupportedEncodingException
+    * @throws Exception
+    */
+   private StompFrame createFrame(ServerMessage serverMessage, int deliveryCount, StompSubscription subscription) throws UnsupportedEncodingException,
+                                                                                                                 Exception
+   {
+      synchronized (serverMessage)
+      {
          Map<String, Object> headers = new HashMap<String, Object>();
          headers.put(Stomp.Headers.Message.DESTINATION, serverMessage.getAddress().toString());
          if (subscription.getID() != null)
          {
             headers.put(Stomp.Headers.Message.SUBSCRIPTION, subscription.getID());
          }
+         
          HornetQBuffer buffer = serverMessage.getBodyBuffer();
-
+   
          int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer.writerIndex()
                                                                  : serverMessage.getEndOfBodyPosition();
          int size = bodyPos - buffer.readerIndex();
@@ -127,31 +170,8 @@
          serverMessage.getBodyBuffer().resetReaderIndex();
          StompFrame frame = new StompFrame(Stomp.Responses.MESSAGE, headers, data);
          StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame, deliveryCount);
-
-         int length = frame.getEncodedSize();
-
-         if (subscription.isAutoACK())
-         {
-            session.acknowledge(consumerID, serverMessage.getMessageID());
-            session.commit();
-         }
-         else
-         {
-            messagesToAck.put(serverMessage.getMessageID(), new Pair<Long, Integer>(consumerID, length));
-         }
-
-         // Must send AFTER adding to messagesToAck - or could get acked from client BEFORE it's been added!
-         manager.send(connection, frame);
-
-         return length;
-
+         return frame;
       }
-      catch (Exception e)
-      {
-         e.printStackTrace();
-         return 0;
-      }
-
    }
 
    public int sendLargeMessageContinuation(long consumerID, byte[] body, boolean continues, boolean requiresResponse)



More information about the hornetq-commits mailing list