[hornetq-commits] JBoss hornetq SVN: r8951 - trunk/src/main/org/hornetq/core/message/impl.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Mar 24 07:56:47 EDT 2010


Author: timfox
Date: 2010-03-24 07:56:47 -0400 (Wed, 24 Mar 2010)
New Revision: 8951

Modified:
   trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
Log:
fixed race with copying and getting encoded buffer

Modified: trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java	2010-03-24 10:06:32 UTC (rev 8950)
+++ trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java	2010-03-24 11:56:47 UTC (rev 8951)
@@ -48,11 +48,11 @@
 public abstract class MessageImpl implements MessageInternal
 {
    // Constants -----------------------------------------------------
-     
+
    private static final Logger log = Logger.getLogger(MessageImpl.class);
 
    public static final SimpleString HDR_ROUTE_TO_IDS = new SimpleString("_HQ_ROUTE_TO");
-   
+
    public static final int BUFFER_HEADER_SPACE = PacketImpl.PACKET_HEADERS_SIZE;
 
    protected long messageID;
@@ -144,18 +144,26 @@
       priority = other.getPriority();
       properties = new TypedProperties(other.getProperties());
 
-      bufferValid = other.bufferValid;
-      endOfBodyPosition = other.endOfBodyPosition;
-      endOfMessagePosition = other.endOfMessagePosition;
-      copied = other.copied;
+      // This MUST be synchronized using the monitor on the other message to prevent it running concurrently
+      // with getEncodedBuffer(), otherwise can introduce race condition when delivering concurrently to
+      // many subscriptions and bridging to other nodes in a cluster
+      synchronized (other)
+      {
+         bufferValid = other.bufferValid;
+         endOfBodyPosition = other.endOfBodyPosition;
+         endOfMessagePosition = other.endOfMessagePosition;
+         copied = other.copied;
 
-      if (other.buffer != null)
-      {
-         createBody(other.buffer.capacity());
-         // We need to copy the underlying buffer too, since the different messsages thereafter might have different
-         // properties set on them, making their encoding different
-         buffer = other.buffer.copy(0, other.buffer.capacity());
-         buffer.setIndex(other.buffer.readerIndex(), other.buffer.writerIndex());
+         if (other.buffer != null)
+         {
+            createBody(other.buffer.capacity());
+
+            // We need to copy the underlying buffer too, since the different messsages thereafter might have different
+            // properties set on them, making their encoding different
+            buffer = other.buffer.copy(0, other.buffer.capacity());
+
+            buffer.setIndex(other.buffer.readerIndex(), other.buffer.writerIndex());
+         }
       }
    }
 
@@ -214,9 +222,7 @@
       {
          if (buffer instanceof LargeMessageBufferInternal == false)
          {
-            bodyBuffer = new ResetLimitWrappedHornetQBuffer(BUFFER_HEADER_SPACE + DataConstants.SIZE_INT,
-                                                            buffer,
-                                                            this);
+            bodyBuffer = new ResetLimitWrappedHornetQBuffer(BUFFER_HEADER_SPACE + DataConstants.SIZE_INT, buffer, this);
          }
          else
          {
@@ -257,7 +263,6 @@
       this.type = type;
    }
 
-
    public boolean isDurable()
    {
       return durable;
@@ -434,7 +439,7 @@
          buffer.setIndex(0, endOfMessagePosition);
 
          bufferUsed = true;
-         
+
          return buffer;
       }
    }
@@ -800,12 +805,11 @@
 
    // Private -------------------------------------------------------
 
-
    private TypedProperties getProperties()
    {
       return properties;
    }
-   
+
    // This must be synchronized as it can be called concurrently id the message is being delivered concurently to
    // many queues - the first caller in this case will actually encode it
    private synchronized HornetQBuffer encodeToBuffer()
@@ -830,7 +834,7 @@
 
          // Position at end of body and skip past the message end position int.
          // check for enough room in the buffer even tho it is dynamic
-         if((endOfBodyPosition + 4) > buffer.capacity())
+         if ((endOfBodyPosition + 4) > buffer.capacity())
          {
             buffer.setIndex(0, endOfBodyPosition);
             buffer.writeInt(0);



More information about the hornetq-commits mailing list