Author: clebert.suconic(a)jboss.com
Date: 2011-09-30 00:16:23 -0400 (Fri, 30 Sep 2011)
New Revision: 11446
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompSession.java
Log:
Isolating Stomp Messages on topics
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2011-09-29
22:03:44 UTC (rev 11445)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2011-09-30
04:16:23 UTC (rev 11446)
@@ -12,6 +12,7 @@
*/
package org.hornetq.core.protocol.stomp;
+import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -94,14 +95,56 @@
{
StompSubscription subscription = subscriptions.get(consumerID);
+ StompFrame frame = createFrame(serverMessage, deliveryCount, subscription);
+
+ int length = frame.getEncodedSize();
+
+ if (subscription.isAutoACK())
+ {
+ session.acknowledge(consumerID, serverMessage.getMessageID());
+ session.commit();
+ }
+ else
+ {
+ messagesToAck.put(serverMessage.getMessageID(), new Pair<Long,
Integer>(consumerID, length));
+ }
+
+ // Must send AFTER adding to messagesToAck - or could get acked from client
BEFORE it's been added!
+ manager.send(connection, frame);
+
+ return length;
+
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ return 0;
+ }
+
+ }
+
+ /**
+ * @param serverMessage
+ * @param deliveryCount
+ * @param subscription
+ * @return
+ * @throws UnsupportedEncodingException
+ * @throws Exception
+ */
+ private StompFrame createFrame(ServerMessage serverMessage, int deliveryCount,
StompSubscription subscription) throws UnsupportedEncodingException,
+
Exception
+ {
+ synchronized (serverMessage)
+ {
Map<String, Object> headers = new HashMap<String, Object>();
headers.put(Stomp.Headers.Message.DESTINATION,
serverMessage.getAddress().toString());
if (subscription.getID() != null)
{
headers.put(Stomp.Headers.Message.SUBSCRIPTION, subscription.getID());
}
+
HornetQBuffer buffer = serverMessage.getBodyBuffer();
-
+
int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer.writerIndex()
:
serverMessage.getEndOfBodyPosition();
int size = bodyPos - buffer.readerIndex();
@@ -127,31 +170,8 @@
serverMessage.getBodyBuffer().resetReaderIndex();
StompFrame frame = new StompFrame(Stomp.Responses.MESSAGE, headers, data);
StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame,
deliveryCount);
-
- int length = frame.getEncodedSize();
-
- if (subscription.isAutoACK())
- {
- session.acknowledge(consumerID, serverMessage.getMessageID());
- session.commit();
- }
- else
- {
- messagesToAck.put(serverMessage.getMessageID(), new Pair<Long,
Integer>(consumerID, length));
- }
-
- // Must send AFTER adding to messagesToAck - or could get acked from client
BEFORE it's been added!
- manager.send(connection, frame);
-
- return length;
-
+ return frame;
}
- catch (Exception e)
- {
- e.printStackTrace();
- return 0;
- }
-
}
public int sendLargeMessageContinuation(long consumerID, byte[] body, boolean
continues, boolean requiresResponse)
Show replies by date