[hornetq-commits] JBoss hornetq SVN: r8951 - trunk/src/main/org/hornetq/core/message/impl.
do-not-reply at jboss.org
do-not-reply at jboss.org
Wed Mar 24 07:56:47 EDT 2010
Author: timfox
Date: 2010-03-24 07:56:47 -0400 (Wed, 24 Mar 2010)
New Revision: 8951
Modified:
trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
Log:
fixed race with copying and getting encoded buffer
Modified: trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2010-03-24 10:06:32 UTC (rev 8950)
+++ trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2010-03-24 11:56:47 UTC (rev 8951)
@@ -48,11 +48,11 @@
public abstract class MessageImpl implements MessageInternal
{
// Constants -----------------------------------------------------
-
+
private static final Logger log = Logger.getLogger(MessageImpl.class);
public static final SimpleString HDR_ROUTE_TO_IDS = new SimpleString("_HQ_ROUTE_TO");
-
+
public static final int BUFFER_HEADER_SPACE = PacketImpl.PACKET_HEADERS_SIZE;
protected long messageID;
@@ -144,18 +144,26 @@
priority = other.getPriority();
properties = new TypedProperties(other.getProperties());
- bufferValid = other.bufferValid;
- endOfBodyPosition = other.endOfBodyPosition;
- endOfMessagePosition = other.endOfMessagePosition;
- copied = other.copied;
+ // This MUST be synchronized using the monitor on the other message to prevent it running concurrently
+ // with getEncodedBuffer(), otherwise can introduce race condition when delivering concurrently to
+ // many subscriptions and bridging to other nodes in a cluster
+ synchronized (other)
+ {
+ bufferValid = other.bufferValid;
+ endOfBodyPosition = other.endOfBodyPosition;
+ endOfMessagePosition = other.endOfMessagePosition;
+ copied = other.copied;
- if (other.buffer != null)
- {
- createBody(other.buffer.capacity());
- // We need to copy the underlying buffer too, since the different messsages thereafter might have different
- // properties set on them, making their encoding different
- buffer = other.buffer.copy(0, other.buffer.capacity());
- buffer.setIndex(other.buffer.readerIndex(), other.buffer.writerIndex());
+ if (other.buffer != null)
+ {
+ createBody(other.buffer.capacity());
+
+ // We need to copy the underlying buffer too, since the different messsages thereafter might have different
+ // properties set on them, making their encoding different
+ buffer = other.buffer.copy(0, other.buffer.capacity());
+
+ buffer.setIndex(other.buffer.readerIndex(), other.buffer.writerIndex());
+ }
}
}
@@ -214,9 +222,7 @@
{
if (buffer instanceof LargeMessageBufferInternal == false)
{
- bodyBuffer = new ResetLimitWrappedHornetQBuffer(BUFFER_HEADER_SPACE + DataConstants.SIZE_INT,
- buffer,
- this);
+ bodyBuffer = new ResetLimitWrappedHornetQBuffer(BUFFER_HEADER_SPACE + DataConstants.SIZE_INT, buffer, this);
}
else
{
@@ -257,7 +263,6 @@
this.type = type;
}
-
public boolean isDurable()
{
return durable;
@@ -434,7 +439,7 @@
buffer.setIndex(0, endOfMessagePosition);
bufferUsed = true;
-
+
return buffer;
}
}
@@ -800,12 +805,11 @@
// Private -------------------------------------------------------
-
private TypedProperties getProperties()
{
return properties;
}
-
+
// This must be synchronized as it can be called concurrently id the message is being delivered concurently to
// many queues - the first caller in this case will actually encode it
private synchronized HornetQBuffer encodeToBuffer()
@@ -830,7 +834,7 @@
// Position at end of body and skip past the message end position int.
// check for enough room in the buffer even tho it is dynamic
- if((endOfBodyPosition + 4) > buffer.capacity())
+ if ((endOfBodyPosition + 4) > buffer.capacity())
{
buffer.setIndex(0, endOfBodyPosition);
buffer.writeInt(0);
More information about the hornetq-commits
mailing list