[hornetq-commits] JBoss hornetq SVN: r8403 - in branches/20-optimisation: src/main/org/hornetq/core/message and 8 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Wed Nov 25 15:11:08 EST 2009
Author: timfox
Date: 2009-11-25 15:11:08 -0500 (Wed, 25 Nov 2009)
New Revision: 8403
Modified:
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/20-optimisation/src/main/org/hornetq/core/message/Message.java
branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java
branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java
branches/20-optimisation/src/main/org/hornetq/core/server/ServerMessage.java
branches/20-optimisation/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
Log:
optimisation
Modified: branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-11-25 11:11:45 UTC (rev 8402)
+++ branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-11-25 20:11:08 UTC (rev 8403)
@@ -697,6 +697,7 @@
log.trace("Setting up flowControlSize to " + message.getPacketSize() + " on message = " + clMessage);
}
+ // log.info("setting flow control size as " + message.getPacketSize());
clMessage.setFlowControlSize(message.getPacketSize());
consumer.handleMessage(clMessage);
Modified: branches/20-optimisation/src/main/org/hornetq/core/message/Message.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/message/Message.java 2009-11-25 11:11:45 UTC (rev 8402)
+++ branches/20-optimisation/src/main/org/hornetq/core/message/Message.java 2009-11-25 20:11:08 UTC (rev 8403)
@@ -14,12 +14,10 @@
package org.hornetq.core.message;
import java.io.InputStream;
-import java.io.OutputStream;
import java.util.Map;
import java.util.Set;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.exception.HornetQException;
import org.hornetq.utils.SimpleString;
import org.hornetq.utils.TypedProperties;
@@ -74,12 +72,8 @@
void decodeFromBuffer(HornetQBuffer buffer);
- HornetQBuffer encodeToBuffer();
-
int getEndOfMessagePosition();
- // void setEndOfBodyPosition();
-
int getEndOfBodyPosition();
void checkCopy();
@@ -88,7 +82,7 @@
void resetCopied();
-// void resetEndOfBodyPosition();
+ HornetQBuffer getEncodedBuffer();
// Properties
// ------------------------------------------------------------------
Modified: branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-11-25 11:11:45 UTC (rev 8402)
+++ branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-11-25 20:11:08 UTC (rev 8403)
@@ -96,6 +96,18 @@
protected HornetQBuffer buffer;
+ protected ResetLimitWrappedHornetQBuffer bodyBuffer;
+
+ protected boolean bufferValid;
+
+ private int endOfBodyPosition = -1;
+
+ private int endOfMessagePosition;
+
+ private boolean copied = true;
+
+ private boolean bufferUsed;
+
// Constructors --------------------------------------------------
protected MessageImpl()
@@ -135,27 +147,38 @@
createBody(initialMessageBufferSize);
}
- private void createBody(final int initialMessageBufferSize)
+ /*
+ * Copy constructor
+ */
+ protected MessageImpl(final MessageImpl other)
{
- buffer = HornetQBuffers.dynamicBuffer(initialMessageBufferSize);
+ messageID = other.getMessageID();
+ destination = other.getDestination();
+ type = other.getType();
+ durable = other.isDurable();
+ expiration = other.getExpiration();
+ timestamp = other.getTimestamp();
+ priority = other.getPriority();
+ properties = new TypedProperties(other.getProperties());
- // There's a bug in netty which means a dynamic buffer won't resize until you write a byte
- buffer.writeByte((byte)0);
+ bufferValid = other.bufferValid;
+ endOfBodyPosition = other.endOfBodyPosition;
+ endOfMessagePosition = other.endOfMessagePosition;
+ copied = other.copied;
- int limit = PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT;
-
- buffer.setIndex(limit, limit);
-
- // endOfBodyPosition = limit;
+ // We need to copy the underlying buffer too, since the different messsages thereafter might have different
+ // properties set on them, making their encoding different
+ buffer = other.buffer.copy(0, other.buffer.capacity());
+ buffer.setIndex(other.buffer.readerIndex(), other.buffer.writerIndex());
}
// Message implementation ----------------------------------------
public int getEncodeSize()
{
- int headersPropsSize = this.getHeadersAndPropertiesEncodeSize();
+ int headersPropsSize = getHeadersAndPropertiesEncodeSize();
- int bodyPos = this.endOfBodyPosition == -1 ? buffer.writerIndex() : this.endOfBodyPosition;
+ int bodyPos = endOfBodyPosition == -1 ? buffer.writerIndex() : endOfBodyPosition;
int bodySize = bodyPos - PacketImpl.PACKET_HEADERS_SIZE - DataConstants.SIZE_INT;
@@ -174,29 +197,48 @@
}
public void encodeHeadersAndProperties(final HornetQBuffer buffer)
- {
+ {
buffer.writeLong(messageID);
- buffer.writeSimpleString(destination);
+ buffer.writeSimpleString(destination);
buffer.writeByte(type);
buffer.writeBoolean(durable);
buffer.writeLong(expiration);
buffer.writeLong(timestamp);
- buffer.writeByte(priority);
- properties.encode(buffer);
+ buffer.writeByte(priority);
+ properties.encode(buffer);
}
public void decodeHeadersAndProperties(final HornetQBuffer buffer)
{
- messageID = buffer.readLong();
- destination = buffer.readSimpleString();
+ messageID = buffer.readLong();
+ destination = buffer.readSimpleString();
type = buffer.readByte();
durable = buffer.readBoolean();
expiration = buffer.readLong();
timestamp = buffer.readLong();
- priority = buffer.readByte();
+ priority = buffer.readByte();
properties.decode(buffer);
}
+ public HornetQBuffer getBodyBuffer()
+ {
+ if (bodyBuffer == null)
+ {
+ if (buffer instanceof LargeMessageBuffer == false)
+ {
+ bodyBuffer = new ResetLimitWrappedHornetQBuffer(PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT,
+ buffer,
+ this);
+ }
+ else
+ {
+ return buffer;
+ }
+ }
+
+ return bodyBuffer;
+ }
+
public long getMessageID()
{
return messageID;
@@ -295,6 +337,99 @@
return map;
}
+ public void decodeFromBuffer(final HornetQBuffer buffer)
+ {
+ this.buffer = buffer;
+
+ decode();
+ }
+
+ public void bodyChanged()
+ {
+ // If the body is changed we must copy the buffer otherwise can affect the previously sent message
+ // which might be in the Netty write queue
+ checkCopy();
+
+ bufferValid = false;
+
+ endOfBodyPosition = -1;
+ }
+
+ public void checkCopy()
+ {
+ if (!copied)
+ {
+ forceCopy();
+
+ copied = true;
+ }
+ }
+
+ public void resetCopied()
+ {
+ copied = false;
+ }
+
+ public int getEndOfMessagePosition()
+ {
+ return endOfMessagePosition;
+ }
+
+ public int getEndOfBodyPosition()
+ {
+ return endOfBodyPosition;
+ }
+
+ // Encode to journal or paging
+ public void encode(final HornetQBuffer buff)
+ {
+ encodeToBuffer();
+
+ buff.writeBytes(buffer, PacketImpl.PACKET_HEADERS_SIZE, endOfMessagePosition - PacketImpl.PACKET_HEADERS_SIZE);
+ }
+
+ // Decode from journal or paging
+ public void decode(final HornetQBuffer buff)
+ {
+ int start = buff.readerIndex();
+
+ endOfBodyPosition = buff.readInt();
+
+ endOfMessagePosition = buff.getInt(endOfBodyPosition - PacketImpl.PACKET_HEADERS_SIZE + start);
+
+ int length = endOfMessagePosition - PacketImpl.PACKET_HEADERS_SIZE;
+
+ buffer.setIndex(0, PacketImpl.PACKET_HEADERS_SIZE);
+
+ buffer.writeBytes(buff, start, length);
+
+ decode();
+
+ buff.readerIndex(start + length);
+ }
+
+ public synchronized HornetQBuffer getEncodedBuffer()
+ {
+ HornetQBuffer buff = encodeToBuffer();
+
+ if (bufferUsed)
+ {
+ HornetQBuffer copied = buff.copy(0, buff.capacity());
+
+ copied.setIndex(0, endOfMessagePosition);
+
+ return copied;
+ }
+ else
+ {
+ buffer.setIndex(0, endOfMessagePosition);
+
+ bufferUsed = true;
+
+ return buffer;
+ }
+ }
+
// Properties
// ---------------------------------------------------------------------------------------
@@ -657,123 +792,68 @@
// Private -------------------------------------------------------
- // Inner classes -------------------------------------------------
-
- private class DecodingContext implements BodyEncoder
+ // This must be synchronized as it can be called concurrently id the message is being delivered concurently to
+ // many queues - the first caller in this case will actually encode it
+ private synchronized HornetQBuffer encodeToBuffer()
{
- private int lastPos = 0;
-
- public DecodingContext()
+ if (!bufferValid)
{
- }
+ if (bufferUsed)
+ {
+ // Cannot use same buffer - must copy
- public void open()
- {
- }
+ forceCopy();
+ }
- public void close()
- {
- }
-
- public int encode(ByteBuffer bufferRead) throws HornetQException
- {
- HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bufferRead);
- return encode(buffer, bufferRead.capacity());
- }
-
- public int encode(HornetQBuffer bufferOut, int size)
- {
- bufferOut.writeBytes(getWholeBuffer(), lastPos, size);
- lastPos += size;
- return size;
- }
- }
-
- protected ResetLimitWrappedHornetQBuffer bodyBuffer;
-
- public HornetQBuffer getBodyBuffer()
- {
- if (bodyBuffer == null)
- {
- if (buffer instanceof LargeMessageBuffer == false)
+ if (endOfBodyPosition == -1)
{
- bodyBuffer = new ResetLimitWrappedHornetQBuffer(PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT,
- buffer,
- this);
+ // Means sending message for first time
+ endOfBodyPosition = buffer.writerIndex();
}
- else
- {
- return buffer;
- }
- }
- return bodyBuffer;
- }
+ // write it
+ buffer.setInt(PacketImpl.PACKET_HEADERS_SIZE, endOfBodyPosition);
- protected boolean bufferValid;
+ // Position at end of body and skip past the message end position int
+ buffer.setIndex(0, endOfBodyPosition + DataConstants.SIZE_INT);
- private int endOfBodyPosition = -1;
+ encodeHeadersAndProperties(buffer);
- private int endOfMessagePosition;
+ // Write end of message position
- private boolean copied = true;
+ endOfMessagePosition = buffer.writerIndex();
- /*
- * Copy constructor
- */
- protected MessageImpl(final MessageImpl other, final boolean shallow)
- {
- messageID = other.getMessageID();
- destination = other.getDestination();
- type = other.getType();
- durable = other.isDurable();
- expiration = other.getExpiration();
- timestamp = other.getTimestamp();
- priority = other.getPriority();
- properties = new TypedProperties(other.getProperties());
+ buffer.setInt(endOfBodyPosition, endOfMessagePosition);
- this.bufferValid = other.bufferValid;
- this.endOfBodyPosition = other.endOfBodyPosition;
- this.endOfMessagePosition = other.endOfMessagePosition;
- this.copied = other.copied;
+ bufferValid = true;
+ }
- if (shallow)
- {
- this.buffer = other.buffer;
- }
- else
- {
- // We need to copy the underlying buffer too, since the different messsages thereafter might have different
- // properties set on them, making their encoding different
- buffer = other.buffer.copy(0, other.buffer.capacity());
- buffer.setIndex(other.buffer.readerIndex(), other.buffer.writerIndex());
- }
+ return buffer;
}
- public void bodyChanged()
+ private void decode()
{
- // If the body is changed we must copy the buffer otherwise can affect the previously sent message
- // which might be in the Netty write queue
- checkCopy();
+ endOfBodyPosition = buffer.getInt(PacketImpl.PACKET_HEADERS_SIZE);
- bufferValid = false;
+ buffer.readerIndex(endOfBodyPosition + DataConstants.SIZE_INT);
- this.endOfBodyPosition = -1;
+ decodeHeadersAndProperties(buffer);
+
+ endOfMessagePosition = buffer.readerIndex();
+
+ bufferValid = true;
}
- public void checkCopy()
+ private void createBody(final int initialMessageBufferSize)
{
- if (!copied)
- {
- forceCopy();
+ buffer = HornetQBuffers.dynamicBuffer(initialMessageBufferSize);
- copied = true;
- }
- }
+ // There's a bug in netty which means a dynamic buffer won't resize until you write a byte
+ buffer.writeByte((byte)0);
- public void resetCopied()
- {
- copied = false;
+ int limit = PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT;
+
+ buffer.setIndex(limit, limit);
}
private void forceCopy()
@@ -782,102 +862,46 @@
buffer = buffer.copy(0, buffer.capacity());
- buffer.setIndex(0, this.endOfBodyPosition);
+ buffer.setIndex(0, endOfBodyPosition);
if (bodyBuffer != null)
{
bodyBuffer.setBuffer(buffer);
}
- }
- public int getEndOfMessagePosition()
- {
- return this.endOfMessagePosition;
+ bufferUsed = false;
}
- public int getEndOfBodyPosition()
- {
- return this.endOfBodyPosition;
- }
+ // Inner classes -------------------------------------------------
- // Encode to journal or paging
- public void encode(HornetQBuffer buff)
+ private final class DecodingContext implements BodyEncoder
{
- encodeToBuffer();
+ private int lastPos = 0;
- buff.writeBytes(buffer, PacketImpl.PACKET_HEADERS_SIZE, endOfMessagePosition - PacketImpl.PACKET_HEADERS_SIZE);
- }
+ public DecodingContext()
+ {
+ }
- // Decode from journal or paging
- public void decode(HornetQBuffer buff)
- {
- int start = buff.readerIndex();
+ public void open()
+ {
+ }
- endOfBodyPosition = buff.readInt();
+ public void close()
+ {
+ }
- endOfMessagePosition = buff.getInt(endOfBodyPosition - PacketImpl.PACKET_HEADERS_SIZE + start);
+ public int encode(final ByteBuffer bufferRead) throws HornetQException
+ {
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bufferRead);
+ return encode(buffer, bufferRead.capacity());
+ }
- int length = endOfMessagePosition - PacketImpl.PACKET_HEADERS_SIZE;
-
- buffer.setIndex(0, PacketImpl.PACKET_HEADERS_SIZE);
-
- buffer.writeBytes(buff, start, length);
-
- decode();
-
- buff.readerIndex(start + length);
- }
-
- // This must be synchronized as it can be called concurrently id the message is being delivered concurently to
- // many queues - the first caller in this case will actually encode it
- public synchronized HornetQBuffer encodeToBuffer()
- {
- if (!bufferValid)
+ public int encode(final HornetQBuffer bufferOut, final int size)
{
- if (endOfBodyPosition == -1)
- {
- // Means sending message for first time
- endOfBodyPosition = buffer.writerIndex();
- }
-
- // write it
- buffer.setInt(PacketImpl.PACKET_HEADERS_SIZE, endOfBodyPosition);
-
- // Position at end of body and skip past the message end position int
- buffer.setIndex(0, endOfBodyPosition + DataConstants.SIZE_INT);
-
- encodeHeadersAndProperties(buffer);
-
- // Write end of message position
-
- this.endOfMessagePosition = buffer.writerIndex();
-
- buffer.setInt(endOfBodyPosition, endOfMessagePosition);
-
- this.bufferValid = true;
+ bufferOut.writeBytes(getWholeBuffer(), lastPos, size);
+ lastPos += size;
+ return size;
}
-
- return buffer;
}
- public void decode()
- {
- this.endOfBodyPosition = buffer.getInt(PacketImpl.PACKET_HEADERS_SIZE);
-
- buffer.readerIndex(this.endOfBodyPosition + DataConstants.SIZE_INT);
-
- this.decodeHeadersAndProperties(buffer);
-
- this.endOfMessagePosition = buffer.readerIndex();
-
- this.bufferValid = true;
- }
-
- public void decodeFromBuffer(HornetQBuffer buffer)
- {
- this.buffer = buffer;
-
- decode();
- }
-
}
Modified: branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2009-11-25 11:11:45 UTC (rev 8402)
+++ branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2009-11-25 20:11:08 UTC (rev 8403)
@@ -74,7 +74,7 @@
*/
private LargeServerMessageImpl(final LargeServerMessageImpl copy, final SequentialFile fileCopy, final long newID)
{
- super(copy, true);
+ super(copy);
this.linkMessage = copy;
storageManager = copy.storageManager;
file = fileCopy;
Modified: branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2009-11-25 11:11:45 UTC (rev 8402)
+++ branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2009-11-25 20:11:08 UTC (rev 8403)
@@ -235,7 +235,7 @@
size = buffer.readerIndex();
}
- public final int getPacketSize()
+ public int getPacketSize()
{
if (size == -1)
{
Modified: branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java 2009-11-25 11:11:45 UTC (rev 8402)
+++ branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java 2009-11-25 20:11:08 UTC (rev 8403)
@@ -69,19 +69,19 @@
@Override
public HornetQBuffer encode(final RemotingConnection connection)
{
- HornetQBuffer orig = message.encodeToBuffer();
+ HornetQBuffer buffer = message.getEncodedBuffer();
- //Now we must copy this buffer, before sending to Netty, as it could be concurrently delivered to many consumers
+ //Sanity check
+ if (buffer.writerIndex() != message.getEndOfMessagePosition())
+ {
+ throw new IllegalStateException("Wrong encode position");
+ }
- HornetQBuffer buffer = orig.copy(0, orig.capacity());
-
- buffer.setIndex(0, message.getEndOfMessagePosition());
-
- buffer.writeLong(consumerID);
+ buffer.writeLong(consumerID);
buffer.writeInt(deliveryCount);
-
+
size = buffer.writerIndex();
-
+
//Write standard headers
int len = size - DataConstants.SIZE_INT;
@@ -90,7 +90,7 @@
buffer.setLong(DataConstants.SIZE_INT + DataConstants.SIZE_BYTE, channelID);
//Position reader for reading by Netty
- buffer.readerIndex(0);
+ buffer.setIndex(0, size);
return buffer;
}
@@ -107,7 +107,7 @@
deliveryCount = buffer.readInt();
size = buffer.readerIndex();
-
+
//Need to position buffer for reading
buffer.setIndex(PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT, message.getEndOfBodyPosition());
Modified: branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java 2009-11-25 11:11:45 UTC (rev 8402)
+++ branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java 2009-11-25 20:11:08 UTC (rev 8403)
@@ -72,19 +72,16 @@
@Override
public HornetQBuffer encode(final RemotingConnection connection)
{
- //this isn't right when forwarding a message that has been already received - because writerindex will
- //be pointing at end of message
+ HornetQBuffer buffer = message.getEncodedBuffer();
- HornetQBuffer orig = message.encodeToBuffer();
+ //Sanity check
+ if (buffer.writerIndex() != message.getEndOfMessagePosition())
+ {
+ throw new IllegalStateException("Wrong encode position");
+ }
- //FIXME - for now we are copying due to concurrent sends to many bridges on the server
-
- HornetQBuffer buffer = orig.copy(0, orig.capacity());
-
- buffer.setIndex(0, message.getEndOfMessagePosition());
-
buffer.writeBoolean(requiresResponse);
-
+
size = buffer.writerIndex();
//Write standard headers
@@ -108,8 +105,12 @@
//Buffer comes in after having read standard headers and positioned at Beginning of body part
message.decodeFromBuffer(buffer);
-
- requiresResponse = buffer.readBoolean();
+
+ int ri = buffer.readerIndex();
+
+ requiresResponse = buffer.readBoolean();
+
+ buffer.readerIndex(ri);
}
// Private -------------------------------------------------------
Modified: branches/20-optimisation/src/main/org/hornetq/core/server/ServerMessage.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/ServerMessage.java 2009-11-25 11:11:45 UTC (rev 8402)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/ServerMessage.java 2009-11-25 20:11:08 UTC (rev 8403)
@@ -43,8 +43,6 @@
ServerMessage copy();
- ServerMessage shallowCopy();
-
int getMemoryEstimate();
int getRefCount();
Modified: branches/20-optimisation/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2009-11-25 11:11:45 UTC (rev 8402)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2009-11-25 20:11:08 UTC (rev 8403)
@@ -411,12 +411,12 @@
if (flowRecord != null)
{
- // We make a shallow copy of the message, then we strip out the unwanted routing id headers and leave
+ // We make a copy of the message, then we strip out the unwanted routing id headers and leave
// only
// the one pertinent for the destination node - this is important since different queues on different
// nodes could have same queue ids
// Note we must copy since same message may get routed to other nodes which require different headers
- message = message.shallowCopy();
+ message = message.copy();
// TODO - we can optimise this
Modified: branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-25 11:11:45 UTC (rev 8402)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-25 20:11:08 UTC (rev 8403)
@@ -185,9 +185,10 @@
public HandleStatus handle(final MessageReference ref) throws Exception
{
+ //log.info("handling message");
if (availableCredits != null && availableCredits.get() <= 0)
{
-
+ // log.info("busy");
return HandleStatus.BUSY;
}
@@ -417,7 +418,7 @@
public void receiveCredits(final int credits) throws Exception
{
-
+ // log.info("Receiving credits " + credits);
if (credits == -1)
{
// No flow control
@@ -591,8 +592,11 @@
if (availableCredits != null)
{
+ //log.info("Subtracting credits " + packet.getPacketSize());
availableCredits.addAndGet(-packet.getPacketSize());
}
+
+ // log.info("delivered message");
}
Modified: branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-11-25 11:11:45 UTC (rev 8402)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-11-25 20:11:08 UTC (rev 8403)
@@ -68,9 +68,9 @@
/*
* Copy constructor
*/
- protected ServerMessageImpl(final ServerMessageImpl other, final boolean shallow)
+ protected ServerMessageImpl(final ServerMessageImpl other)
{
- super(other, shallow);
+ super(other);
}
public void setMessageID(final long id)
@@ -164,7 +164,7 @@
public ServerMessage copy(final long newID)
{
- ServerMessage m = new ServerMessageImpl(this, false);
+ ServerMessage m = new ServerMessageImpl(this);
m.setMessageID(newID);
@@ -173,14 +173,9 @@
public ServerMessage copy()
{
- return new ServerMessageImpl(this, false);
+ return new ServerMessageImpl(this);
}
- public ServerMessage shallowCopy()
- {
- return new ServerMessageImpl(this, true);
- }
-
public ServerMessage makeCopyForExpiryOrDLA(final long newID, final boolean expiry) throws Exception
{
/*
Modified: branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-25 11:11:45 UTC (rev 8402)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-25 20:11:08 UTC (rev 8403)
@@ -1574,6 +1574,8 @@
final CreditManagerHolder holder = this.getCreditManagerHolder(address);
int credits = packet.getCredits();
+
+ //log.info("requesting credits " + credits);
int gotCredits = holder.manager.acquireCredits(credits, new CreditsAvailableRunnable()
{
@@ -1594,6 +1596,8 @@
}
}
});
+
+ //log.info("got credits " + gotCredits);
if (gotCredits > 0)
{
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java 2009-11-25 11:11:45 UTC (rev 8402)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java 2009-11-25 20:11:08 UTC (rev 8403)
@@ -71,7 +71,7 @@
server.start();
ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration(connectorFactoryClassName));
- sf.setConsumerWindowSize(0);
+ // sf.setConsumerWindowSize(0);
ClientSession session = sf.createSession(false, true, true);
@@ -103,7 +103,7 @@
}
log.info("sent messages");
-
+
ClientConsumer consumer = session.createConsumer(QUEUE);
session.start();
@@ -112,6 +112,8 @@
{
ClientMessage message2 = consumer.receive();
+ // log.info("got message " + i);
+
HornetQBuffer buffer = message2.getBodyBuffer();
assertEquals("testINVMCoreClient", buffer.readString());
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java 2009-11-25 11:11:45 UTC (rev 8402)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java 2009-11-25 20:11:08 UTC (rev 8403)
@@ -67,7 +67,7 @@
public void testFlowControlMultipleConsumers() throws Exception
{
- testFlowControl(1000, 500, 10 * 1024, 1024, 1024, 1024, 5, 1, 0, false);
+ testFlowControl(1000, 500, -1, 1024, 1024, 1024, 5, 1, 0, false);
}
public void testFlowControlZeroConsumerWindowSize() throws Exception
@@ -134,7 +134,7 @@
{
testFlowControl(1000, 10000, 100 * 1024, 1024, 1024, 1024, 1, 1, 0, true, 1000, true);
}
-
+
public void testFlowControlLargeMessages7() throws Exception
{
testFlowControl(1000, 10000, 100 * 1024, 1024, 1024, 1024, 2, 2, 0, true, 1000, true);
@@ -212,7 +212,7 @@
{
session.createQueue(address, new SimpleString(queueName + i), null, false);
}
-
+
final byte[] bytes = RandomUtil.randomBytes(messageSize);
class MyHandler implements MessageHandler
@@ -230,7 +230,7 @@
byte[] bytesRead = new byte[messageSize];
message.getBodyBuffer().readBytes(bytesRead);
-
+
assertEqualsByteArrays(bytes, bytesRead);
message.acknowledge();
@@ -244,13 +244,14 @@
{
Thread.sleep(consumerDelay);
}
+
}
catch (Exception e)
{
log.error("Failed to handle message", e);
-
+
this.exception = e;
-
+
latch.countDown();
}
}
@@ -285,12 +286,10 @@
long start = System.currentTimeMillis();
-
-
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session.createClientMessage(false);
-
+
message.getBodyBuffer().writeBytes(bytes);
for (int j = 0; j < numProducers; j++)
@@ -389,7 +388,7 @@
byte[] bytes = new byte[0];
ClientMessage message = session.createClientMessage(false);
-
+
message.getBodyBuffer().writeBytes(bytes);
producer.send(message);
@@ -421,7 +420,7 @@
t.start();
ClientMessage message2 = session.createClientMessage(false);
-
+
message2.getBodyBuffer().writeBytes(bytes);
producer2.send(message2);
@@ -474,7 +473,7 @@
byte[] bytes = new byte[0];
ClientMessage message = session.createClientMessage(false);
-
+
message.getBodyBuffer().writeBytes(bytes);
producer.send(message);
@@ -504,7 +503,7 @@
assertEquals(1, waiting);
message = session.createClientMessage(false);
-
+
message.getBodyBuffer().writeBytes(bytes);
producer2.send(message);
@@ -552,7 +551,7 @@
byte[] bytes = new byte[0];
ClientMessage message = session.createClientMessage(false);
-
+
message.getBodyBuffer().writeBytes(bytes);
producer.send(message);
@@ -600,7 +599,7 @@
session.close();
message = session.createClientMessage(false);
-
+
message.getBodyBuffer().writeBytes(bytes);
producer3.send(message);
@@ -648,7 +647,7 @@
byte[] bytes = new byte[2000];
ClientMessage message = session.createClientMessage(false);
-
+
message.getBodyBuffer().writeBytes(bytes);
final AtomicBoolean closed = new AtomicBoolean(false);
@@ -721,7 +720,7 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session.createClientMessage(false);
-
+
message.getBodyBuffer().writeBytes(bytes);
producer.send(message);
@@ -738,4 +737,72 @@
server.stop();
}
+ //Not technically a flow control test, but what the hell
+ public void testMultipleConsumers() throws Exception
+ {
+ HornetQServer server = createServer(false, isNetty());
+
+ server.start();
+
+ ClientSessionFactory sf = createFactory(isNetty());
+
+ final ClientSession session = sf.createSession(false, true, true, true);
+
+ session.createQueue("address", "queue1", null, false);
+ session.createQueue("address", "queue2", null, false);
+ session.createQueue("address", "queue3", null, false);
+ session.createQueue("address", "queue4", null, false);
+ session.createQueue("address", "queue5", null, false);
+
+ ClientConsumer consumer1 = session.createConsumer("queue1");
+ ClientConsumer consumer2 = session.createConsumer("queue2");
+ ClientConsumer consumer3 = session.createConsumer("queue3");
+ ClientConsumer consumer4 = session.createConsumer("queue4");
+ ClientConsumer consumer5 = session.createConsumer("queue5");
+
+ ClientProducer producer = session.createProducer("address");
+
+ byte[] bytes = new byte[2000];
+
+ ClientMessage message = session.createClientMessage(false);
+
+ message.getBodyBuffer().writeBytes(bytes);
+
+ final int numMessages = 1000;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ producer.send(message);
+ }
+
+ session.start();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage msg = consumer1.receive(1000);
+
+ assertNotNull(msg);
+
+ msg = consumer2.receive(5000);
+
+ assertNotNull(msg);
+
+ msg = consumer3.receive(5000);
+
+ assertNotNull(msg);
+
+ msg = consumer4.receive(5000);
+
+ assertNotNull(msg);
+
+ msg = consumer5.receive(5000);
+
+ assertNotNull(msg);
+ }
+
+ session.close();
+
+ server.stop();
+ }
+
}
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-11-25 11:11:45 UTC (rev 8402)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-11-25 20:11:08 UTC (rev 8403)
@@ -88,14 +88,13 @@
checkFreePort(PORTS);
clearData();
-
+
consumers = new ConsumerHolder[MAX_CONSUMERS];
servers = new HornetQServer[MAX_SERVERS];
sfs = new ClientSessionFactory[MAX_SERVERS];
-
}
@Override
@@ -207,15 +206,15 @@
final int consumerCount,
final boolean local) throws Exception
{
-// System.out.println("waiting for bindings on node " + node +
-// " address " +
-// address +
-// " count " +
-// count +
-// " consumerCount " +
-// consumerCount +
-// " local " +
-// local);
+ // System.out.println("waiting for bindings on node " + node +
+ // " address " +
+ // address +
+ // " count " +
+ // count +
+ // " consumerCount " +
+ // consumerCount +
+ // " local " +
+ // local);
HornetQServer server = this.servers[node];
if (server == null)
@@ -442,7 +441,8 @@
producer.send(message);
}
- } finally
+ }
+ finally
{
session.close();
}
@@ -676,7 +676,9 @@
if (j != (Integer)(message.getObjectProperty(COUNT_PROP)))
{
outOfOrder = true;
- System.out.println("Message j=" + j + " was received out of order = " + message.getObjectProperty(COUNT_PROP));
+ System.out.println("Message j=" + j +
+ " was received out of order = " +
+ message.getObjectProperty(COUNT_PROP));
}
}
}
@@ -823,7 +825,7 @@
message.acknowledge();
}
- log.info("consumer " + consumerIDs[i] +" returns " + count);
+ log.info("consumer " + consumerIDs[i] + " returns " + count);
}
else
{
@@ -1165,8 +1167,6 @@
Map<String, Object> params = generateParams(node, netty);
-
-
if (netty)
{
TransportConfiguration nettytc = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
@@ -1175,7 +1175,7 @@
else
{
TransportConfiguration invmtc = new TransportConfiguration(INVM_ACCEPTOR_FACTORY, params);
- configuration.getAcceptorConfigurations().add(invmtc);
+ configuration.getAcceptorConfigurations().add(invmtc);
}
HornetQServer server;
More information about the hornetq-commits
mailing list