Author: timfox
Date: 2009-11-24 19:45:52 -0500 (Tue, 24 Nov 2009)
New Revision: 8396
Modified:
branches/20-optimisation/.classpath
branches/20-optimisation/build-hornetq.xml
branches/20-optimisation/examples/jms/interceptor/src/org/hornetq/jms/example/SimpleInterceptor.java
branches/20-optimisation/src/main/org/hornetq/core/buffers/HornetQBuffer.java
branches/20-optimisation/src/main/org/hornetq/core/buffers/impl/ChannelBufferWrapper.java
branches/20-optimisation/src/main/org/hornetq/core/buffers/impl/ResetLimitWrappedHornetQBuffer.java
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionPacketHandler.java
branches/20-optimisation/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java
branches/20-optimisation/src/main/org/hornetq/core/journal/impl/JournalImpl.java
branches/20-optimisation/src/main/org/hornetq/core/message/Message.java
branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java
branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PageImpl.java
branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/MessagePacket.java
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java
branches/20-optimisation/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/20-optimisation/src/main/org/hornetq/utils/UTF8Util.java
branches/20-optimisation/tests/jms-tests/src/org/hornetq/jms/tests/BrowserTest.java
branches/20-optimisation/tests/jms-tests/src/org/hornetq/jms/tests/TransactedSessionTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/AckBatchSizeTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/DeadLetterAddressTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/InVMNonPersistentMessageBufferTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/performance/persistence/StorageManagerTimingTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
Log:
optimisation
Modified: branches/20-optimisation/.classpath
===================================================================
--- branches/20-optimisation/.classpath 2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/.classpath 2009-11-25 00:45:52 UTC (rev 8396)
@@ -100,7 +100,7 @@
<classpathentry kind="con"
path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="lib" path="tests/tmpfiles"/>
<classpathentry kind="lib"
path="thirdparty/net/java/dev/javacc/lib/javacc.jar"/>
- <classpathentry kind="lib"
path="thirdparty/org/jboss/netty/lib/netty.jar"/>
+ <classpathentry kind="lib"
path="thirdparty/org/jboss/netty/lib/netty.jar"
sourcepath="/home/tim/workspace/netty-3.1.5.GA/src/main"/>
<classpathentry kind="lib"
path="thirdparty/log4j/lib/log4j.jar"/>
<classpathentry kind="lib"
path="thirdparty/org/jboss/naming/lib/jnpserver.jar"/>
<classpathentry kind="lib"
path="thirdparty/org/jboss/security/lib/jbosssx.jar"/>
Modified: branches/20-optimisation/build-hornetq.xml
===================================================================
--- branches/20-optimisation/build-hornetq.xml 2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/build-hornetq.xml 2009-11-25 00:45:52 UTC (rev 8396)
@@ -172,6 +172,10 @@
<!-- Classpath definition
-->
<!--
========================================================================================
-->
+ <path id="core.compilation.classpath">
+ <path refid="org.jboss.netty.classpath"/>
+ </path>
+
<path id="jms.compilation.classpath">
<path location="${build.core.classes.dir}"/>
<path refid="org.jboss.javaee.classpath"/>
@@ -401,6 +405,7 @@
</src>
<include name="**/hornetq/core/**/*.java"/>
<include name="**/hornetq/utils/**/*.java"/>
+ <classpath refid="core.compilation.classpath"/>
</javac>
<javah class="org.hornetq.core.asyncio.impl.AsynchronousFileImpl"
classpath="${build.core.classes.dir}"
destdir="./native/src"/>
Modified:
branches/20-optimisation/examples/jms/interceptor/src/org/hornetq/jms/example/SimpleInterceptor.java
===================================================================
---
branches/20-optimisation/examples/jms/interceptor/src/org/hornetq/jms/example/SimpleInterceptor.java 2009-11-24
19:05:15 UTC (rev 8395)
+++
branches/20-optimisation/examples/jms/interceptor/src/org/hornetq/jms/example/SimpleInterceptor.java 2009-11-25
00:45:52 UTC (rev 8396)
@@ -38,7 +38,7 @@
if (packet instanceof SessionSendMessage)
{
SessionSendMessage realPacket = (SessionSendMessage)packet;
- Message msg = realPacket.getServerMessage();
+ Message msg = realPacket.getMessage();
msg.putStringProperty(new SimpleString("newproperty"), new
SimpleString("Hello from interceptor!"));
}
//We return true which means "call next interceptor" (if there is one) or
target.
Modified: branches/20-optimisation/src/main/org/hornetq/core/buffers/HornetQBuffer.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/buffers/HornetQBuffer.java 2009-11-24
19:05:15 UTC (rev 8395)
+++
branches/20-optimisation/src/main/org/hornetq/core/buffers/HornetQBuffer.java 2009-11-25
00:45:52 UTC (rev 8396)
@@ -207,7 +207,7 @@
String readString();
- String readUTF() throws Exception;
+ String readUTF();
void writeBoolean(boolean val);
@@ -219,5 +219,5 @@
void writeString(String val);
- void writeUTF(String utf) throws Exception;
+ void writeUTF(String utf);
}
Modified:
branches/20-optimisation/src/main/org/hornetq/core/buffers/impl/ChannelBufferWrapper.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/buffers/impl/ChannelBufferWrapper.java 2009-11-24
19:05:15 UTC (rev 8395)
+++
branches/20-optimisation/src/main/org/hornetq/core/buffers/impl/ChannelBufferWrapper.java 2009-11-25
00:45:52 UTC (rev 8396)
@@ -37,7 +37,7 @@
protected ChannelBuffer buffer;
public ChannelBufferWrapper(final ChannelBuffer buffer)
- {
+ {
this.buffer = buffer;
}
@@ -48,69 +48,79 @@
public SimpleString readNullableSimpleString()
{
- int b = readByte();
+ int b = buffer.readByte();
if (b == DataConstants.NULL)
{
return null;
}
else
{
- return readSimpleString();
+ return readSimpleStringInternal();
}
}
public String readNullableString()
{
- int b = readByte();
+ int b = buffer.readByte();
if (b == DataConstants.NULL)
{
return null;
}
else
{
- return readString();
+ return readStringInternal();
}
}
public SimpleString readSimpleString()
{
- int len = readInt();
+ return readSimpleStringInternal();
+ }
+
+ private SimpleString readSimpleStringInternal()
+ {
+ int len = buffer.readInt();
byte[] data = new byte[len];
- readBytes(data);
+ buffer.readBytes(data);
return new SimpleString(data);
}
public String readString()
{
- int len = readInt();
+ return readStringInternal();
+ }
+
+ private String readStringInternal()
+ {
+ int len = buffer.readInt();
char[] chars = new char[len];
for (int i = 0; i < len; i++)
{
- chars[i] = (char)readShort();
+ chars[i] = (char)buffer.readShort();
}
return new String(chars);
}
- public String readUTF() throws Exception
+ public String readUTF()
{
return UTF8Util.readUTF(this);
}
public void writeBoolean(final boolean val)
{
- writeByte((byte)(val ? -1 : 0));
+ buffer.writeByte((byte)(val ? -1 : 0));
}
public void writeNullableSimpleString(final SimpleString val)
{
if (val == null)
{
- writeByte(DataConstants.NULL);
+ buffer.writeByte(DataConstants.NULL);
}
else
{
- writeByte(DataConstants.NOT_NULL);
- writeSimpleString(val);
+ buffer.writeByte(DataConstants.NOT_NULL);
+ writeSimpleStringInternal(val);
}
}
@@ -118,32 +128,42 @@
{
if (val == null)
{
- writeByte(DataConstants.NULL);
+ buffer.writeByte(DataConstants.NULL);
}
else
{
- writeByte(DataConstants.NOT_NULL);
- writeString(val);
+ buffer.writeByte(DataConstants.NOT_NULL);
+ writeStringInternal(val);
}
}
public void writeSimpleString(final SimpleString val)
{
+ writeSimpleStringInternal(val);
+ }
+
+ private void writeSimpleStringInternal(final SimpleString val)
+ {
byte[] data = val.getData();
- writeInt(data.length);
- writeBytes(data);
+ buffer.writeInt(data.length);
+ buffer.writeBytes(data);
}
public void writeString(final String val)
{
- writeInt(val.length());
+ writeStringInternal(val);
+ }
+
+ private void writeStringInternal(final String val)
+ {
+ buffer.writeInt(val.length());
for (int i = 0; i < val.length(); i++)
{
- writeShort((short)val.charAt(i));
+ buffer.writeShort((short)val.charAt(i));
}
}
- public void writeUTF(final String utf) throws Exception
+ public void writeUTF(final String utf)
{
UTF8Util.saveUTF(this, utf);
}
@@ -154,7 +174,6 @@
return buffer.capacity();
}
-
public ChannelBuffer channelBuffer()
{
return buffer;
Modified:
branches/20-optimisation/src/main/org/hornetq/core/buffers/impl/ResetLimitWrappedHornetQBuffer.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/buffers/impl/ResetLimitWrappedHornetQBuffer.java 2009-11-24
19:05:15 UTC (rev 8395)
+++
branches/20-optimisation/src/main/org/hornetq/core/buffers/impl/ResetLimitWrappedHornetQBuffer.java 2009-11-25
00:45:52 UTC (rev 8396)
@@ -13,8 +13,12 @@
package org.hornetq.core.buffers.impl;
+import java.nio.ByteBuffer;
+
import org.hornetq.core.buffers.HornetQBuffer;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.Message;
+import org.hornetq.utils.SimpleString;
/**
* A ResetLimitWrappedHornetQBuffer
@@ -27,8 +31,11 @@
private static final Logger log =
Logger.getLogger(ResetLimitWrappedHornetQBuffer.class);
private final int limit;
-
- public ResetLimitWrappedHornetQBuffer(final int limit, final HornetQBuffer buffer)
+
+ private Message message;
+
+ public ResetLimitWrappedHornetQBuffer(final int limit, final HornetQBuffer buffer,
+ final Message message)
{
super(buffer.channelBuffer());
@@ -40,7 +47,14 @@
}
readerIndex(limit);
+
+ this.message = message;
}
+
+ private void changed()
+ {
+ message.bodyChanged();
+ }
public void setBuffer(HornetQBuffer buffer)
{
@@ -49,9 +63,12 @@
public void clear()
{
+ changed();
+
buffer.clear();
buffer.setIndex(limit, limit);
+
}
public void readerIndex(int readerIndex)
@@ -71,11 +88,15 @@
public void resetWriterIndex()
{
+ changed();
+
buffer.writerIndex(limit);
}
public void setIndex(int readerIndex, int writerIndex)
{
+ changed();
+
if (readerIndex < limit)
{
readerIndex = limit;
@@ -89,10 +110,261 @@
public void writerIndex(int writerIndex)
{
+ changed();
+
if (writerIndex < limit)
{
writerIndex = limit;
}
+
buffer.writerIndex(writerIndex);
}
+
+ @Override
+ public void setByte(int index, byte value)
+ {
+ changed();
+
+ super.setByte(index, value);
+ }
+
+ @Override
+ public void setBytes(int index, byte[] src, int srcIndex, int length)
+ {
+ changed();
+
+ super.setBytes(index, src, srcIndex, length);
+ }
+
+ @Override
+ public void setBytes(int index, byte[] src)
+ {
+ changed();
+
+ super.setBytes(index, src);
+ }
+
+ @Override
+ public void setBytes(int index, ByteBuffer src)
+ {
+ changed();
+
+ super.setBytes(index, src);
+ }
+
+ @Override
+ public void setBytes(int index, HornetQBuffer src, int srcIndex, int length)
+ {
+ changed();
+
+ super.setBytes(index, src, srcIndex, length);
+ }
+
+ @Override
+ public void setBytes(int index, HornetQBuffer src, int length)
+ {
+ changed();
+
+ super.setBytes(index, src, length);
+ }
+
+ @Override
+ public void setBytes(int index, HornetQBuffer src)
+ {
+ changed();
+
+ super.setBytes(index, src);
+ }
+
+ @Override
+ public void setChar(int index, char value)
+ {
+ changed();
+
+ super.setChar(index, value);
+ }
+
+ @Override
+ public void setDouble(int index, double value)
+ {
+ changed();
+
+ super.setDouble(index, value);
+ }
+
+ @Override
+ public void setFloat(int index, float value)
+ {
+ changed();
+
+ super.setFloat(index, value);
+ }
+
+ @Override
+ public void setInt(int index, int value)
+ {
+ changed();
+
+ super.setInt(index, value);
+ }
+
+ @Override
+ public void setLong(int index, long value)
+ {
+ changed();
+
+ super.setLong(index, value);
+ }
+
+ @Override
+ public void setShort(int index, short value)
+ {
+ changed();
+
+ super.setShort(index, value);
+ }
+
+ @Override
+ public void writeBoolean(boolean val)
+ {
+ changed();
+
+ super.writeBoolean(val);
+ }
+
+ @Override
+ public void writeByte(byte value)
+ {
+ changed();
+
+ super.writeByte(value);
+ }
+
+ @Override
+ public void writeBytes(byte[] src, int srcIndex, int length)
+ {
+ changed();
+
+ super.writeBytes(src, srcIndex, length);
+ }
+
+ @Override
+ public void writeBytes(byte[] src)
+ {
+ changed();
+
+ super.writeBytes(src);
+ }
+
+ @Override
+ public void writeBytes(ByteBuffer src)
+ {
+ changed();
+
+ super.writeBytes(src);
+ }
+
+ @Override
+ public void writeBytes(HornetQBuffer src, int srcIndex, int length)
+ {
+ changed();
+
+ super.writeBytes(src, srcIndex, length);
+ }
+
+ @Override
+ public void writeBytes(HornetQBuffer src, int length)
+ {
+ changed();
+
+ super.writeBytes(src, length);
+ }
+
+ @Override
+ public void writeChar(char chr)
+ {
+ changed();
+
+ super.writeChar(chr);
+ }
+
+ @Override
+ public void writeDouble(double value)
+ {
+ changed();
+
+ super.writeDouble(value);
+ }
+
+ @Override
+ public void writeFloat(float value)
+ {
+ changed();
+
+ super.writeFloat(value);
+ }
+
+ @Override
+ public void writeInt(int value)
+ {
+ changed();
+
+ super.writeInt(value);
+ }
+
+ @Override
+ public void writeLong(long value)
+ {
+ changed();
+
+ super.writeLong(value);
+ }
+
+ @Override
+ public void writeNullableSimpleString(SimpleString val)
+ {
+ changed();
+
+ super.writeNullableSimpleString(val);
+ }
+
+ @Override
+ public void writeNullableString(String val)
+ {
+ changed();
+
+ super.writeNullableString(val);
+ }
+
+ @Override
+ public void writeShort(short value)
+ {
+ changed();
+
+ super.writeShort(value);
+ }
+
+ @Override
+ public void writeSimpleString(SimpleString val)
+ {
+ changed();
+
+ super.writeSimpleString(val);
+ }
+
+ @Override
+ public void writeString(String val)
+ {
+ changed();
+
+ super.writeString(val);
+ }
+
+ @Override
+ public void writeUTF(String utf)
+ {
+ changed();
+
+ super.writeUTF(utf);
+ }
}
Modified:
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2009-11-24
19:05:15 UTC (rev 8395)
+++
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2009-11-25
00:45:52 UTC (rev 8396)
@@ -216,7 +216,7 @@
// we only force delivery once per call to receive
if (!deliveryForced)
- {
+ {
session.forceDelivery(id, forceDeliveryCount.incrementAndGet());
deliveryForced = true;
@@ -249,7 +249,7 @@
session.workDone();
if (m.containsProperty(FORCED_DELIVERY_MESSAGE))
- {
+ {
long seq = m.getLongProperty(FORCED_DELIVERY_MESSAGE);
if (seq >= forceDeliveryCount.longValue())
{
@@ -451,7 +451,7 @@
// This is ok - we just ignore the message
return;
}
-
+
ClientMessageInternal messageToHandle = message;
messageToHandle.onReceipt(this);
Modified:
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java 2009-11-24
19:05:15 UTC (rev 8395)
+++
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java 2009-11-25
00:45:52 UTC (rev 8396)
@@ -239,5 +239,16 @@
{
this.bodyInputStream = bodyInputStream;
}
+
+ public void setBuffer(HornetQBuffer buffer)
+ {
+ this.buffer = buffer;
+
+ if (bodyBuffer != null)
+ {
+ bodyBuffer.setBuffer(buffer);
+ }
+ }
+
}
Modified:
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java 2009-11-24
19:05:15 UTC (rev 8395)
+++
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java 2009-11-25
00:45:52 UTC (rev 8396)
@@ -34,13 +34,13 @@
void setFlowControlSize(int flowControlSize);
void onReceipt(ClientConsumerInternal consumer);
-
+
void setLargeMessage(boolean largeMessage);
/**
* Discard unused packets (used on large-message)
*/
- void discardLargeBody();
-
+ void discardLargeBody();
+
void setBuffer(HornetQBuffer buffer);
}
Modified:
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-11-24
19:05:15 UTC (rev 8395)
+++
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-11-25
00:45:52 UTC (rev 8396)
@@ -689,6 +689,8 @@
if (consumer != null)
{
ClientMessageInternal clMessage = (ClientMessageInternal)message.getMessage();
+
+ clMessage.setDeliveryCount(message.getDeliveryCount());
if (trace)
{
Modified:
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionPacketHandler.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionPacketHandler.java 2009-11-24
19:05:15 UTC (rev 8395)
+++
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionPacketHandler.java 2009-11-25
00:45:52 UTC (rev 8396)
@@ -68,7 +68,7 @@
break;
}
case SESS_RECEIVE_MSG:
- {
+ {
SessionReceiveMessage message = (SessionReceiveMessage) packet;
clientSession.handleReceiveMessage(message.getConsumerID(), message);
Modified:
branches/20-optimisation/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java 2009-11-24
19:05:15 UTC (rev 8395)
+++
branches/20-optimisation/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java 2009-11-25
00:45:52 UTC (rev 8396)
@@ -1098,7 +1098,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.remoting.spi.HornetQBuffer#readUTF()
*/
- public String readUTF() throws Exception
+ public String readUTF()
{
return UTF8Util.readUTF(this);
}
@@ -1172,7 +1172,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.remoting.spi.HornetQBuffer#writeUTF(java.lang.String)
*/
- public void writeUTF(final String utf) throws Exception
+ public void writeUTF(final String utf)
{
throw new IllegalAccessError(READ_ONLY_ERROR_MESSAGE);
}
Modified:
branches/20-optimisation/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-24
19:05:15 UTC (rev 8395)
+++
branches/20-optimisation/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-25
00:45:52 UTC (rev 8396)
@@ -420,7 +420,7 @@
bb.writeInt(fileId);
bb.writeLong(id);
bb.writeInt(record.getEncodeSize());
- bb.writeByte(recordType);
+ bb.writeByte(recordType);
record.encode(bb);
bb.writeInt(size);
}
Modified: branches/20-optimisation/src/main/org/hornetq/core/message/Message.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/message/Message.java 2009-11-24
19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/core/message/Message.java 2009-11-25
00:45:52 UTC (rev 8396)
@@ -69,16 +69,27 @@
HornetQBuffer getBodyBuffer();
+
+ //Should the following methods really be on the public API?
+
void decodeFromBuffer(HornetQBuffer buffer);
HornetQBuffer encodeToBuffer();
int getEndOfMessagePosition();
+ // void setEndOfBodyPosition();
+
int getEndOfBodyPosition();
- void forceCopy();
+ void checkCopy();
+ void bodyChanged();
+
+ void resetCopied();
+
+// void resetEndOfBodyPosition();
+
// Properties
// ------------------------------------------------------------------
Modified:
branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-11-24
19:05:15 UTC (rev 8395)
+++
branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-11-25
00:45:52 UTC (rev 8396)
@@ -15,6 +15,7 @@
import static org.hornetq.utils.DataConstants.SIZE_BOOLEAN;
import static org.hornetq.utils.DataConstants.SIZE_BYTE;
+import static org.hornetq.utils.DataConstants.SIZE_INT;
import static org.hornetq.utils.DataConstants.SIZE_LONG;
import java.nio.ByteBuffer;
@@ -73,7 +74,7 @@
public static final SimpleString HDR_FROM_CLUSTER = new
SimpleString("_HQ_FROM_CLUSTER");
public static final SimpleString HDR_LAST_VALUE_NAME = new
SimpleString("_HQ_LVQ_NAME");
-
+
// Attributes ----------------------------------------------------
protected long messageID;
@@ -94,9 +95,9 @@
protected byte priority;
protected HornetQBuffer buffer;
-
- private int encodeSize;
-
+
+ // private int encodeSize;
+
// Constructors --------------------------------------------------
protected MessageImpl()
@@ -126,61 +127,76 @@
this.expiration = expiration;
this.timestamp = timestamp;
this.priority = priority;
- createBody(initialMessageBufferSize);
+ createBody(initialMessageBufferSize);
}
-
+
protected MessageImpl(final long messageID, final int initialMessageBufferSize)
{
this();
this.messageID = messageID;
createBody(initialMessageBufferSize);
}
-
+
private void createBody(final int initialMessageBufferSize)
{
buffer = HornetQChannelBuffers.dynamicBuffer(initialMessageBufferSize);
-
- //There's a bug in netty which means a dynamic buffer won't resize until
you write a byte
+
+ // There's a bug in netty which means a dynamic buffer won't resize until
you write a byte
buffer.writeByte((byte)0);
-
+
int limit = PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT;
-
+
buffer.setIndex(limit, limit);
-
- endOfBodyPosition = -1;
+
+ //endOfBodyPosition = limit;
}
// Message implementation ----------------------------------------
public int getEncodeSize()
+ {
+ int headersPropsSize = this.getHeadersAndPropertiesEncodeSize();
+
+ int bodyPos = this.endOfBodyPosition == -1 ? buffer.writerIndex() :
this.endOfBodyPosition;
+
+ int bodySize = bodyPos - PacketImpl.PACKET_HEADERS_SIZE - DataConstants.SIZE_INT;
+
+ return SIZE_INT + bodySize + SIZE_INT + headersPropsSize;
+ }
+
+ public int getHeadersAndPropertiesEncodeSize()
{
- return encodeSize;
+ return SIZE_LONG + /* Destination */SimpleString.sizeofString(destination) +
+ /* Type */SIZE_BYTE +
+ /* Durable */SIZE_BOOLEAN +
+ /* Expiration */SIZE_LONG +
+ /* Timestamp */SIZE_LONG +
+ /* Priority */SIZE_BYTE +
+ /* PropertySize and Properties */properties.getEncodeSize();
}
-
+
public void encodeHeadersAndProperties(final HornetQBuffer buffer)
- {
- buffer.writeLong(messageID);
- buffer.writeSimpleString(destination);
+ {
+ buffer.writeLong(messageID);
+ buffer.writeSimpleString(destination);
buffer.writeByte(type);
buffer.writeBoolean(durable);
buffer.writeLong(expiration);
buffer.writeLong(timestamp);
buffer.writeByte(priority);
- properties.encode(buffer);
- encodeSize = buffer.writerIndex() - PacketImpl.PACKET_HEADERS_SIZE;
+ properties.encode(buffer);
}
-
+
public void decodeHeadersAndProperties(final HornetQBuffer buffer)
- {
- messageID = buffer.readLong();
- destination = buffer.readSimpleString();
+ {
+ messageID = buffer.readLong();
+ destination = buffer.readSimpleString();
type = buffer.readByte();
durable = buffer.readBoolean();
expiration = buffer.readLong();
timestamp = buffer.readLong();
priority = buffer.readByte();
- properties.decode(buffer);
- encodeSize = buffer.readerIndex() - PacketImpl.PACKET_HEADERS_SIZE;
+ properties.decode(buffer);
}
public long getMessageID()
@@ -196,6 +212,8 @@
public void setDestination(final SimpleString destination)
{
this.destination = destination;
+
+ bufferValid = false;
}
public byte getType()
@@ -211,6 +229,8 @@
public void setDurable(final boolean durable)
{
this.durable = durable;
+
+ bufferValid = false;
}
public long getExpiration()
@@ -221,6 +241,8 @@
public void setExpiration(final long expiration)
{
this.expiration = expiration;
+
+ bufferValid = false;
}
public long getTimestamp()
@@ -231,6 +253,8 @@
public void setTimestamp(final long timestamp)
{
this.timestamp = timestamp;
+
+ bufferValid = false;
}
public byte getPriority()
@@ -241,6 +265,8 @@
public void setPriority(final byte priority)
{
this.priority = priority;
+
+ bufferValid = false;
}
public boolean isExpired()
@@ -252,7 +278,7 @@
return System.currentTimeMillis() - expiration >= 0;
}
-
+
public Map<String, Object> toMap()
{
Map<String, Object> map = new HashMap<String, Object>();
@@ -277,46 +303,64 @@
public void putBooleanProperty(final SimpleString key, final boolean value)
{
properties.putBooleanProperty(key, value);
+
+ bufferValid = false;
}
public void putByteProperty(final SimpleString key, final byte value)
{
properties.putByteProperty(key, value);
+
+ bufferValid = false;
}
public void putBytesProperty(final SimpleString key, final byte[] value)
{
properties.putBytesProperty(key, value);
+
+ bufferValid = false;
}
public void putShortProperty(final SimpleString key, final short value)
{
properties.putShortProperty(key, value);
+
+ bufferValid = false;
}
public void putIntProperty(final SimpleString key, final int value)
{
properties.putIntProperty(key, value);
+
+ bufferValid = false;
}
public void putLongProperty(final SimpleString key, final long value)
{
properties.putLongProperty(key, value);
+
+ bufferValid = false;
}
public void putFloatProperty(final SimpleString key, final float value)
{
properties.putFloatProperty(key, value);
+
+ bufferValid = false;
}
public void putDoubleProperty(final SimpleString key, final double value)
{
properties.putDoubleProperty(key, value);
+
+ bufferValid = false;
}
public void putStringProperty(final SimpleString key, final SimpleString value)
{
properties.putSimpleStringProperty(key, value);
+
+ bufferValid = false;
}
public void putObjectProperty(final SimpleString key, final Object value) throws
PropertyConversionException
@@ -324,10 +368,10 @@
if (value == null)
{
// This is ok - when we try to read the same key it will return null too
- return;
+
+ properties.removeProperty(key);
}
-
- if (value instanceof Boolean)
+ else if (value instanceof Boolean)
{
properties.putBooleanProperty(key, (Boolean)value);
}
@@ -363,61 +407,85 @@
{
throw new PropertyConversionException(value.getClass() + " is not a valid
property type");
}
+
+ bufferValid = false;
}
public void putObjectProperty(final String key, final Object value) throws
PropertyConversionException
{
putObjectProperty(new SimpleString(key), value);
+
+ bufferValid = false;
}
public void putBooleanProperty(final String key, final boolean value)
{
properties.putBooleanProperty(new SimpleString(key), value);
+
+ bufferValid = false;
}
public void putByteProperty(final String key, final byte value)
{
properties.putByteProperty(new SimpleString(key), value);
+
+ bufferValid = false;
}
public void putBytesProperty(final String key, final byte[] value)
{
properties.putBytesProperty(new SimpleString(key), value);
+
+ bufferValid = false;
}
public void putShortProperty(final String key, final short value)
{
properties.putShortProperty(new SimpleString(key), value);
+
+ bufferValid = false;
}
public void putIntProperty(final String key, final int value)
{
properties.putIntProperty(new SimpleString(key), value);
+
+ bufferValid = false;
}
public void putLongProperty(final String key, final long value)
{
properties.putLongProperty(new SimpleString(key), value);
+
+ bufferValid = false;
}
public void putFloatProperty(final String key, final float value)
{
properties.putFloatProperty(new SimpleString(key), value);
+
+ bufferValid = false;
}
public void putDoubleProperty(final String key, final double value)
{
properties.putDoubleProperty(new SimpleString(key), value);
+
+ bufferValid = false;
}
public void putStringProperty(final String key, final String value)
{
properties.putSimpleStringProperty(new SimpleString(key), new
SimpleString(value));
+
+ bufferValid = false;
}
public void putTypedProperties(final TypedProperties otherProps)
{
properties.putTypedProperties(otherProps);
+
+ bufferValid = false;
}
public Object getObjectProperty(final SimpleString key)
@@ -541,11 +609,15 @@
public Object removeProperty(final SimpleString key)
{
+ bufferValid = false;
+
return properties.removeProperty(key);
}
public Object removeProperty(final String key)
{
+ bufferValid = false;
+
return properties.removeProperty(new SimpleString(key));
}
@@ -573,32 +645,22 @@
{
return buffer;
}
-
- public void setBuffer(HornetQBuffer buffer)
- {
- this.buffer = buffer;
-
- if (bodyBuffer != null)
- {
- bodyBuffer.setBuffer(buffer);
- }
- }
public BodyEncoder getBodyEncoder()
{
return new DecodingContext();
}
-
+
// Public --------------------------------------------------------
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
-
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
-
+
private class DecodingContext implements BodyEncoder
{
private int lastPos = 0;
@@ -628,151 +690,185 @@
return size;
}
}
-
- //FIXME - all this stuff only used by large messages, move it!
-
- public int getHeadersAndPropertiesEncodeSize()
- {
- return SIZE_LONG + /* Destination */SimpleString.sizeofString(destination) +
- /* Type */SIZE_BYTE +
- /* Durable */SIZE_BOOLEAN +
- /* Expiration */SIZE_LONG +
- /* Timestamp */SIZE_LONG +
- /* Priority */SIZE_BYTE +
- /* PropertySize and Properties */properties.getEncodeSize();
- }
-
-
-
-
-
- private ResetLimitWrappedHornetQBuffer bodyBuffer;
+
+ protected ResetLimitWrappedHornetQBuffer bodyBuffer;
+
public HornetQBuffer getBodyBuffer()
{
if (bodyBuffer == null)
{
if (buffer instanceof LargeMessageBuffer == false)
{
- bodyBuffer = new
ResetLimitWrappedHornetQBuffer(PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT,
buffer);
+ bodyBuffer = new
ResetLimitWrappedHornetQBuffer(PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT,
+ buffer,
+ this);
}
else
{
return buffer;
}
}
-
+
return bodyBuffer;
}
-
-
-
+
protected boolean bufferValid;
- private int endOfBodyPosition;
-
+ private int endOfBodyPosition = -1;
+
private int endOfMessagePosition;
+
+ private boolean copied = true;
+
+ /*
+ * Copy constructor
+ */
+ protected MessageImpl(final MessageImpl other)
+ {
+ messageID = other.getMessageID();
+ destination = other.getDestination();
+ type = other.getType();
+ durable = other.isDurable();
+ expiration = other.getExpiration();
+ timestamp = other.getTimestamp();
+ priority = other.getPriority();
+ properties = new TypedProperties(other.getProperties());
+ // Note, this is a shallow copy - does not copy the buffer
+ buffer = other.buffer;
+ this.bufferValid = other.bufferValid;
+ this.endOfBodyPosition = other.endOfBodyPosition;
+ this.endOfMessagePosition = other.endOfMessagePosition;
+ this.copied = other.copied;
+ }
+
+ public void bodyChanged()
+ {
+ // If the body is changed we must copy the buffer otherwise can affect the
previously sent message
+ // which might be in the Netty write queue
+ checkCopy();
+
+ bufferValid = false;
+
+ this.endOfBodyPosition = -1;
+ }
+
+ public void checkCopy()
+ {
+ if (!copied)
+ {
+ forceCopy();
+
+ copied = true;
+ }
+ }
- public void forceCopy()
+ public void resetCopied()
{
- // Must copy buffer before sending it
- int wi = buffer.writerIndex();
+ copied = false;
+ }
+
+ private void forceCopy()
+ {
+ // Must copy buffer before sending it
- this.buffer = buffer.copy(0, buffer.capacity());
+ buffer = buffer.copy(0, buffer.capacity());
- this.buffer.setIndex(0, wi);
+ buffer.setIndex(0, this.endOfBodyPosition);
+
+ if (bodyBuffer != null)
+ {
+ bodyBuffer.setBuffer(buffer);
+ }
}
-
+
public int getEndOfMessagePosition()
{
return this.endOfMessagePosition;
}
-
+
public int getEndOfBodyPosition()
{
return this.endOfBodyPosition;
}
- //Encode to journal or paging
+ // Encode to journal or paging
public void encode(HornetQBuffer buff)
{
encodeToBuffer();
buff.writeBytes(buffer, PacketImpl.PACKET_HEADERS_SIZE, endOfMessagePosition -
PacketImpl.PACKET_HEADERS_SIZE);
}
-
- //Decode from journal or paging
+
+ // Decode from journal or paging
public void decode(HornetQBuffer buff)
{
int start = buff.readerIndex();
endOfBodyPosition = buff.readInt();
-
+
endOfMessagePosition = buff.getInt(endOfBodyPosition -
PacketImpl.PACKET_HEADERS_SIZE + start);
-
- int endPos = endOfMessagePosition + start -
- PacketImpl.PACKET_HEADERS_SIZE;
- this.buffer.setIndex(0, PacketImpl.PACKET_HEADERS_SIZE);
+ int length = endOfMessagePosition - PacketImpl.PACKET_HEADERS_SIZE;
- buff.writeBytes(buffer, start, endPos - start);
-
- decode();
+ buffer.setIndex(0, PacketImpl.PACKET_HEADERS_SIZE);
+
+ buffer.writeBytes(buff, start, length);
+
+ decode();
+
+ buff.readerIndex(start + length);
}
- public HornetQBuffer encodeToBuffer()
- {
- log.info("encoding msg to buffer, valid " + bufferValid);
-
+ //This must be synchronized as it can be called concurrently id the message is being
delivered concurently to
+ //many queues - the first caller in this case will actually encode it
+ public synchronized HornetQBuffer encodeToBuffer()
+ {
if (!bufferValid)
- {
+ {
if (endOfBodyPosition == -1)
- {
- //Means sending message for first time
- endOfBodyPosition = buffer.writerIndex();
-
- //write it
- buffer.setInt(PacketImpl.PACKET_HEADERS_SIZE, endOfBodyPosition);
-
- log.info("setting end of body pos as " + endOfBodyPosition);
+ {
+ // Means sending message for first time
+ endOfBodyPosition = buffer.writerIndex();
}
+ // write it
+ buffer.setInt(PacketImpl.PACKET_HEADERS_SIZE, endOfBodyPosition);
+
// Position at end of body and skip past the message end position int
- buffer.writerIndex(endOfBodyPosition + DataConstants.SIZE_INT);
+ buffer.setIndex(0, endOfBodyPosition + DataConstants.SIZE_INT);
encodeHeadersAndProperties(buffer);
// Write end of message position
-
+
this.endOfMessagePosition = buffer.writerIndex();
buffer.setInt(endOfBodyPosition, endOfMessagePosition);
-
+
this.bufferValid = true;
}
-
+
return buffer;
}
public void decode()
{
this.endOfBodyPosition = buffer.getInt(PacketImpl.PACKET_HEADERS_SIZE);
-
+
buffer.readerIndex(this.endOfBodyPosition + DataConstants.SIZE_INT);
this.decodeHeadersAndProperties(buffer);
-
+
this.endOfMessagePosition = buffer.readerIndex();
-
- log.info("decoded end of body pos as " + this.endOfBodyPosition);
this.bufferValid = true;
}
-
+
public void decodeFromBuffer(HornetQBuffer buffer)
{
this.buffer = buffer;
-
+
decode();
}
Modified: branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PageImpl.java 2009-11-24
19:05:15 UTC (rev 8395)
+++
branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PageImpl.java 2009-11-25
00:45:52 UTC (rev 8396)
@@ -115,7 +115,7 @@
int oldPos = fileBuffer.readerIndex();
if (fileBuffer.readerIndex() + messageSize < fileBuffer.capacity()
&& fileBuffer.getByte(oldPos + messageSize) == END_BYTE)
{
- PagedMessage msg = new PagedMessageImpl();
+ PagedMessage msg = new PagedMessageImpl();
msg.decode(fileBuffer);
byte b = fileBuffer.readByte();
if (b != END_BYTE)
@@ -124,7 +124,7 @@
// constraint was already checked
throw new IllegalStateException("Internal error, it wasn't
possible to locate END_BYTE " + b);
}
- messages.add(msg);
+ messages.add(msg);
}
else
{
@@ -147,15 +147,11 @@
public void write(final PagedMessage message) throws Exception
{
- log.info("encode size is " + message.getEncodeSize());
-
ByteBuffer buffer = fileFactory.newBuffer(message.getEncodeSize() + SIZE_RECORD);
HornetQBuffer wrap = HornetQChannelBuffers.wrappedBuffer(buffer);
wrap.clear();
- log.info("wrapped " + wrap.channelBuffer());
-
wrap.writeByte(START_BYTE);
wrap.writeInt(0);
int startIndex = wrap.writerIndex();
@@ -172,8 +168,6 @@
size.addAndGet(buffer.limit());
storageManager.pageWrite(message, pageId);
-
- log.info("wrote page");
}
public void sync() throws Exception
Modified:
branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java 2009-11-24
19:05:15 UTC (rev 8395)
+++
branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java 2009-11-25
00:45:52 UTC (rev 8396)
@@ -68,8 +68,7 @@
}
public PagedMessageImpl()
- {
- this(new ServerMessageImpl());
+ {
}
public ServerMessage getMessage(final StorageManager storage)
@@ -109,11 +108,10 @@
{
buffer.readInt(); // This value is only used on LargeMessages for now
- message = new ServerMessageImpl();
+ message = new ServerMessageImpl(-1, 50);
message.decode(buffer);
}
-
}
public void encode(final HornetQBuffer buffer)
Modified:
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/MessagePacket.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/MessagePacket.java 2009-11-24
19:05:15 UTC (rev 8395)
+++
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/MessagePacket.java 2009-11-25
00:45:52 UTC (rev 8396)
@@ -13,20 +13,20 @@
package org.hornetq.core.remoting.impl.wireformat;
-import org.hornetq.core.buffers.HornetQBuffer;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.Message;
-import org.hornetq.core.remoting.RemotingConnection;
-import org.hornetq.utils.DataConstants;
/**
* A MessagePacket
*
- * @author tim
+ * @author Tim Fox
*
*
*/
public abstract class MessagePacket extends PacketImpl
{
+ private static final Logger log = Logger.getLogger(MessagePacket.class);
+
protected Message message;
public MessagePacket(final byte type, final Message message)
@@ -41,41 +41,4 @@
return message;
}
- @Override
- public HornetQBuffer encode(final RemotingConnection connection)
- {
- HornetQBuffer buffer = message.encodeToBuffer();
-
- buffer.setIndex(0, message.getEndOfMessagePosition());
-
- encodeExtraData(buffer);
-
- size = buffer.writerIndex();
-
- //Write standard headers
-
- int len = size - DataConstants.SIZE_INT;
- buffer.setInt(0, len);
- buffer.setByte(DataConstants.SIZE_INT, type);
- buffer.setLong(DataConstants.SIZE_INT + DataConstants.SIZE_BYTE, channelID);
-
- //Position reader for reading by Netty
- buffer.readerIndex(0);
-
- return buffer;
- }
-
- @Override
- public void decodeRest(HornetQBuffer buffer)
- {
- //Buffer comes in after having read standard headers and positioned at Beginning of
body part
-
- message.decodeFromBuffer(buffer);
-
- decodeExtraData(buffer);
- }
-
- protected abstract void encodeExtraData(HornetQBuffer buffer);
-
- protected abstract void decodeExtraData(HornetQBuffer buffer);
}
Modified:
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java 2009-11-24
19:05:15 UTC (rev 8395)
+++
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java 2009-11-25
00:45:52 UTC (rev 8396)
@@ -78,8 +78,7 @@
buffer.writeByte(journalID);
buffer.writeBoolean(isUpdate);
buffer.writeLong(id);
- buffer.writeByte(recordType);
- log.info("encode size is " + encodingData.getEncodeSize());
+ buffer.writeByte(recordType);
buffer.writeInt(encodingData.getEncodeSize());
encodingData.encode(buffer);
}
Modified:
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java 2009-11-24
19:05:15 UTC (rev 8395)
+++
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java 2009-11-25
00:45:52 UTC (rev 8396)
@@ -17,6 +17,7 @@
import org.hornetq.core.client.impl.ClientMessageImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.Message;
+import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.utils.DataConstants;
/**
@@ -43,7 +44,12 @@
this.deliveryCount = deliveryCount;
- message.forceCopy();
+ //If the message hasn't already been copied when the headers/properties/body
was changed since last send
+ //(which will prompt an invalidate(), which will cause a copy if not copied
already)
+ //Then the message needs to be copied before delivering - the previous send may be
in the Netty write queue
+ //so we can't just use the same buffer. Also we can't just duplicate, since
the extra data (consumerID, deliveryCount)
+ //may well be different on different calls
+ //message.forceCopy();
}
public SessionReceiveMessage()
@@ -67,31 +73,55 @@
// Protected -----------------------------------------------------
- protected void encodeExtraData(HornetQBuffer buffer)
+ @Override
+ public HornetQBuffer encode(final RemotingConnection connection)
{
+ //message.setEndOfBodyPosition();
+
+ HornetQBuffer orig = message.encodeToBuffer();
+
+ //Now we must copy this buffer, before sending to Netty, as it could be
concurrently delivered to many consumers
+
+ HornetQBuffer buffer = orig.copy(0, orig.capacity());
+
+ buffer.setIndex(0, message.getEndOfMessagePosition());
+
buffer.writeLong(consumerID);
buffer.writeInt(deliveryCount);
+
+ size = buffer.writerIndex();
+
+ //Write standard headers
+
+ int len = size - DataConstants.SIZE_INT;
+ buffer.setInt(0, len);
+ buffer.setByte(DataConstants.SIZE_INT, type);
+ buffer.setLong(DataConstants.SIZE_INT + DataConstants.SIZE_BYTE, channelID);
+
+ //Position reader for reading by Netty
+ buffer.readerIndex(0);
+
+ return buffer;
}
- protected void decodeExtraData(HornetQBuffer buffer)
- {
- consumerID = buffer.readLong();
- deliveryCount = buffer.readInt();
- }
@Override
- public void decodeRest(HornetQBuffer buffer)
+ public void decode(HornetQBuffer buffer)
{
- //Buffer comes in after having read standard headers and positioned at Beginning of
body part
-
+ channelID = buffer.readLong();
+
message.decodeFromBuffer(buffer);
- decodeExtraData(buffer);
+ consumerID = buffer.readLong();
+ deliveryCount = buffer.readInt();
+
+ size = buffer.readerIndex();
+
//Need to position buffer for reading
- buffer.setIndex(PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT,
message.getEndOfBodyPosition());
+ buffer.setIndex(PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT,
message.getEndOfBodyPosition());
}
-
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
Modified:
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java 2009-11-24
19:05:15 UTC (rev 8395)
+++
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java 2009-11-25
00:45:52 UTC (rev 8396)
@@ -16,7 +16,9 @@
import org.hornetq.core.buffers.HornetQBuffer;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.Message;
+import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.utils.DataConstants;
/**
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
@@ -43,7 +45,12 @@
this.requiresResponse = requiresResponse;
- message.forceCopy();
+ //If the message hasn't already been copied when the headers/properties/body
was changed since last send
+ //(which will prompt an invalidate(), which will cause a copy if not copied
already)
+ //Then the message needs to be copied before sending - the previous send may be in
the Netty write queue
+ //so we can't just use the same buffer. Also we can't just duplicate, since
the extra data (requiresResponse)
+ //may be different on different calls
+ message.checkCopy();
}
public SessionSendMessage()
@@ -62,16 +69,49 @@
// Protected -----------------------------------------------------
- protected void encodeExtraData(HornetQBuffer buffer)
+ @Override
+ public HornetQBuffer encode(final RemotingConnection connection)
{
+ //this isn't right when forwarding a message that has been already received -
because writerindex will
+ //be pointing at end of message
+
+ HornetQBuffer orig = message.encodeToBuffer();
+
+ //FIXME - for now we are copying due to concurrent sends to many bridges on the
server
+
+ HornetQBuffer buffer = orig.copy(0, orig.capacity());
+
+ buffer.setIndex(0, message.getEndOfMessagePosition());
+
buffer.writeBoolean(requiresResponse);
+
+ size = buffer.writerIndex();
+
+ //Write standard headers
+
+ int len = size - DataConstants.SIZE_INT;
+ buffer.setInt(0, len);
+ buffer.setByte(DataConstants.SIZE_INT, type);
+ buffer.setLong(DataConstants.SIZE_INT + DataConstants.SIZE_BYTE, channelID);
+
+ //Position reader for reading by Netty
+ buffer.readerIndex(0);
+
+ message.resetCopied();
+
+ return buffer;
}
- protected void decodeExtraData(HornetQBuffer buffer)
+ @Override
+ public void decodeRest(HornetQBuffer buffer)
{
- requiresResponse = buffer.readBoolean();
+ //Buffer comes in after having read standard headers and positioned at Beginning of
body part
+
+ message.decodeFromBuffer(buffer);
+
+ requiresResponse = buffer.readBoolean();
}
-
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
Modified:
branches/20-optimisation/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java 2009-11-24
19:05:15 UTC (rev 8395)
+++
branches/20-optimisation/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java 2009-11-25
00:45:52 UTC (rev 8396)
@@ -178,7 +178,7 @@
}
public void route(final ServerMessage message, final RoutingContext context)
- {
+ {
byte[] ids = message.getBytesProperty(idsHeaderName);
if (ids == null)
Modified:
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-24
19:05:15 UTC (rev 8395)
+++
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-25
00:45:52 UTC (rev 8396)
@@ -184,12 +184,13 @@
}
public HandleStatus handle(final MessageReference ref) throws Exception
- {
+ {
if (availableCredits != null && availableCredits.get() <= 0)
- {
+ {
+
return HandleStatus.BUSY;
}
-
+
lock.lock();
try
@@ -210,7 +211,7 @@
}
final ServerMessage message = ref.getMessage();
-
+
if (filter != null && !filter.match(message))
{
return HandleStatus.NO_MATCH;
@@ -344,17 +345,23 @@
{
public void run()
{
- promptDelivery(false);
+ try
+ {
+ promptDelivery(false);
- ServerMessage forcedDeliveryMessage = new
ServerMessageImpl(storageManager.generateUniqueID(),
- 50);
+ ServerMessage forcedDeliveryMessage = new
ServerMessageImpl(storageManager.generateUniqueID(), 50);
-
forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE,
sequence);
- forcedDeliveryMessage.setDestination(messageQueue.getName());
+
forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE,
sequence);
+ forcedDeliveryMessage.setDestination(messageQueue.getName());
- final SessionReceiveMessage packet = new SessionReceiveMessage(id,
forcedDeliveryMessage, 0);
+ final SessionReceiveMessage packet = new SessionReceiveMessage(id,
forcedDeliveryMessage, 0);
- channel.send(packet);
+ channel.send(packet);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to send forced delivery message", e);
+ }
}
});
}
@@ -409,7 +416,8 @@
}
public void receiveCredits(final int credits) throws Exception
- {
+ {
+
if (credits == -1)
{
// No flow control
@@ -449,7 +457,7 @@
// Acknowledge acknowledges all refs delivered by the consumer up to and including
the one explicitly
// acknowledged
-
+
MessageReference ref;
do
{
@@ -576,13 +584,13 @@
* @param message
*/
private void deliverStandardMessage(final MessageReference ref, final ServerMessage
message)
- {
+ {
final SessionReceiveMessage packet = new SessionReceiveMessage(id, message,
ref.getDeliveryCount());
-
+
channel.send(packet);
-
+
if (availableCredits != null)
- {
+ {
availableCredits.addAndGet(-packet.getPacketSize());
}
@@ -673,18 +681,19 @@
largeMessage.encodeHeadersAndProperties(headerBuffer);
SessionReceiveLargeMessage initialPacket = new
SessionReceiveLargeMessage(id,
-
headerBuffer.toByteBuffer().array(),
+
headerBuffer.toByteBuffer()
+
.array(),
largeMessage.getLargeBodySize(),
ref.getDeliveryCount());
context = largeMessage.getBodyEncoder();
context.open();
-
+
sentInitialPacket = true;
channel.send(initialPacket);
-
+
if (availableCredits != null)
{
availableCredits.addAndGet(-initialPacket.getPacketSize());
@@ -712,16 +721,16 @@
SessionReceiveContinuationMessage chunk = createChunkSend(context);
int chunkLen = chunk.getBody().length;
-
+
channel.send(chunk);
-
+
if (trace)
{
trace("deliverLargeMessage: Sending " + chunk.getPacketSize()
+
" availableCredits now is " +
availableCredits);
}
-
+
if (availableCredits != null)
{
availableCredits.addAndGet(-chunk.getPacketSize());
@@ -847,7 +856,7 @@
{
MessageReference ref = iterator.next();
try
- {
+ {
HandleStatus status = handle(ref);
if (status == HandleStatus.BUSY)
{
Modified:
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-11-24
19:05:15 UTC (rev 8395)
+++
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-11-25
00:45:52 UTC (rev 8396)
@@ -17,7 +17,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.message.PropertyConversionException;
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
@@ -71,17 +70,7 @@
*/
protected ServerMessageImpl(final ServerMessageImpl other)
{
- this();
- messageID = other.getMessageID();
- destination = other.getDestination();
- type = other.getType();
- durable = other.isDurable();
- expiration = other.getExpiration();
- timestamp = other.getTimestamp();
- priority = other.getPriority();
- properties = new TypedProperties(other.getProperties());
- //Note, this is a shallow copy - does not copy the buffer
- buffer = other.buffer;
+ super(other);
}
public void setMessageID(final long id)
@@ -310,219 +299,7 @@
{
// We first set the message id - this needs to be set on the buffer since this
buffer will be re-used
- //log.info(System.identityHashCode(this) + " encoded message id " +
messageID + " etb " + this.encodedToBuffer);
-
buffer.setLong(buffer.getInt(PacketImpl.PACKET_HEADERS_SIZE) +
DataConstants.SIZE_INT, messageID);
}
- @Override
- public void putBooleanProperty(SimpleString key, boolean value)
- {
- super.putBooleanProperty(key, value);
-
- bufferValid = false;
- }
-
- @Override
- public void putBooleanProperty(String key, boolean value)
- {
- super.putBooleanProperty(key, value);
-
- bufferValid = false;;
- }
-
- @Override
- public void putByteProperty(SimpleString key, byte value)
- {
- super.putByteProperty(key, value);
-
- bufferValid = false;
- }
-
- @Override
- public void putByteProperty(String key, byte value)
- {
- super.putByteProperty(key, value);
-
- bufferValid = false;
- }
-
- @Override
- public void putBytesProperty(SimpleString key, byte[] value)
- {
- super.putBytesProperty(key, value);
-
- bufferValid = false;
- }
-
- @Override
- public void putBytesProperty(String key, byte[] value)
- {
- super.putBytesProperty(key, value);
-
- bufferValid = false;
- }
-
- @Override
- public void putDoubleProperty(SimpleString key, double value)
- {
- super.putDoubleProperty(key, value);
-
- bufferValid = false;
- }
-
- @Override
- public void putDoubleProperty(String key, double value)
- {
- super.putDoubleProperty(key, value);
-
- bufferValid = false;
- }
-
- @Override
- public void putFloatProperty(SimpleString key, float value)
- {
- super.putFloatProperty(key, value);
-
- bufferValid = false;
- }
-
- @Override
- public void putFloatProperty(String key, float value)
- {
- super.putFloatProperty(key, value);
-
- bufferValid = false;
- }
-
- @Override
- public void putIntProperty(SimpleString key, int value)
- {
- super.putIntProperty(key, value);
-
- bufferValid = false;
- }
-
- @Override
- public void putIntProperty(String key, int value)
- {
- super.putIntProperty(key, value);
-
- bufferValid = false;
- }
-
- @Override
- public void putLongProperty(SimpleString key, long value)
- {
- super.putLongProperty(key, value);
-
- bufferValid = false;
- }
-
- @Override
- public void putLongProperty(String key, long value)
- {
- super.putLongProperty(key, value);
-
- bufferValid = false;
- }
-
- @Override
- public void putObjectProperty(SimpleString key, Object value) throws
PropertyConversionException
- {
- super.putObjectProperty(key, value);
-
- bufferValid = false;
- }
-
- @Override
- public void putObjectProperty(String key, Object value) throws
PropertyConversionException
- {
- super.putObjectProperty(key, value);
-
- bufferValid = false;
- }
-
- @Override
- public void putShortProperty(SimpleString key, short value)
- {
- super.putShortProperty(key, value);
-
- bufferValid = false;
- }
-
- @Override
- public void putShortProperty(String key, short value)
- {
- super.putShortProperty(key, value);
-
- bufferValid = false;
- }
-
- @Override
- public void putStringProperty(SimpleString key, SimpleString value)
- {
- super.putStringProperty(key, value);
-
- bufferValid = false;
- }
-
- @Override
- public void putStringProperty(String key, String value)
- {
- super.putStringProperty(key, value);
-
- bufferValid = false;
- }
-
- @Override
- public void putTypedProperties(TypedProperties otherProps)
- {
- super.putTypedProperties(otherProps);
-
- bufferValid = false;
- }
-
- @Override
- public void setDestination(SimpleString destination)
- {
- super.setDestination(destination);
-
- bufferValid = false;
- }
-
- @Override
- public void setDurable(boolean durable)
- {
- super.setDurable(durable);
-
- bufferValid = false;
- }
-
- @Override
- public void setExpiration(long expiration)
- {
- super.setExpiration(expiration);
-
- bufferValid = false;
- }
-
- @Override
- public void setPriority(byte priority)
- {
- super.setPriority(priority);
-
- bufferValid = false;
- }
-
- @Override
- public void setTimestamp(long timestamp)
- {
- super.setTimestamp(timestamp);
-
- bufferValid = false;
- }
-
-
-
}
Modified:
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-24
19:05:15 UTC (rev 8395)
+++
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-25
00:45:52 UTC (rev 8396)
@@ -1434,7 +1434,7 @@
public void handleSendLargeMessage(final SessionSendLargeMessage packet)
{
-
+
// need to create the LargeMessage before continue
long id = storageManager.generateUniqueID();
@@ -1454,7 +1454,7 @@
}
public void handleSend(final SessionSendMessage packet)
- {
+ {
Packet response = null;
ServerMessage message = (ServerMessage)packet.getMessage();
@@ -1574,7 +1574,7 @@
final CreditManagerHolder holder = this.getCreditManagerHolder(address);
int credits = packet.getCredits();
-
+
int gotCredits = holder.manager.acquireCredits(credits, new
CreditsAvailableRunnable()
{
public boolean run(final int credits)
@@ -1594,7 +1594,7 @@
}
}
});
-
+
if (gotCredits > 0)
{
sendProducerCredits(holder, gotCredits, address);
@@ -1942,7 +1942,7 @@
private void sendProducerCredits(final CreditManagerHolder holder, final int credits,
final SimpleString address)
{
holder.outstandingCredits += credits;
-
+
Packet packet = new SessionProducerCreditsMessage(credits, address, -1);
channel.send(packet);
@@ -1967,7 +1967,7 @@
}
if (tx == null || autoCommitSends)
- {
+ {
postOffice.route(msg);
}
else
Modified: branches/20-optimisation/src/main/org/hornetq/utils/UTF8Util.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/utils/UTF8Util.java 2009-11-24 19:05:15
UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/utils/UTF8Util.java 2009-11-25 00:45:52
UTC (rev 8396)
@@ -41,7 +41,7 @@
private static ThreadLocal<SoftReference<StringUtilBuffer>> currenBuffer =
new ThreadLocal<SoftReference<StringUtilBuffer>>();
- public static void saveUTF(final HornetQBuffer out, final String str) throws
IOException
+ public static void saveUTF(final HornetQBuffer out, final String str)
{
StringUtilBuffer buffer = getThreadLocalBuffer();
@@ -107,7 +107,7 @@
}
}
- public static String readUTF(final HornetQBuffer input) throws IOException
+ public static String readUTF(final HornetQBuffer input)
{
StringUtilBuffer buffer = getThreadLocalBuffer();
Modified:
branches/20-optimisation/tests/jms-tests/src/org/hornetq/jms/tests/BrowserTest.java
===================================================================
---
branches/20-optimisation/tests/jms-tests/src/org/hornetq/jms/tests/BrowserTest.java 2009-11-24
19:05:15 UTC (rev 8395)
+++
branches/20-optimisation/tests/jms-tests/src/org/hornetq/jms/tests/BrowserTest.java 2009-11-25
00:45:52 UTC (rev 8396)
@@ -158,11 +158,13 @@
browser = session.createBrowser(queue1);
en = browser.getEnumeration();
+ log.info("browsing");
+
count = 0;
while (en.hasMoreElements())
{
Message mess = (Message)en.nextElement();
- log.trace("message:" + mess);
+ log.info("message:" + mess);
count++;
}
Modified:
branches/20-optimisation/tests/jms-tests/src/org/hornetq/jms/tests/TransactedSessionTest.java
===================================================================
---
branches/20-optimisation/tests/jms-tests/src/org/hornetq/jms/tests/TransactedSessionTest.java 2009-11-24
19:05:15 UTC (rev 8395)
+++
branches/20-optimisation/tests/jms-tests/src/org/hornetq/jms/tests/TransactedSessionTest.java 2009-11-25
00:45:52 UTC (rev 8396)
@@ -37,7 +37,6 @@
// Public --------------------------------------------------------
-
public void testSimpleRollback() throws Exception
{
// send a message
@@ -45,44 +44,44 @@
try
{
- conn = cf.createConnection();
- Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- s.createProducer(queue1).send(s.createTextMessage("one"));
+ conn = cf.createConnection();
+ Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ s.createProducer(queue1).send(s.createTextMessage("one"));
- s.close();
+ s.close();
- s = conn.createSession(true, Session.SESSION_TRANSACTED);
- MessageConsumer c = s.createConsumer(queue1);
- conn.start();
- Message m = c.receive(1000);
- assertNotNull(m);
+ s = conn.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer c = s.createConsumer(queue1);
+ conn.start();
+ Message m = c.receive(1000);
+ assertNotNull(m);
- assertEquals("one", ((TextMessage)m).getText());
- assertFalse(m.getJMSRedelivered());
- assertEquals(1, m.getIntProperty("JMSXDeliveryCount"));
+ assertEquals("one", ((TextMessage)m).getText());
+ assertFalse(m.getJMSRedelivered());
+ assertEquals(1, m.getIntProperty("JMSXDeliveryCount"));
- s.rollback();
+ s.rollback();
- // get the message again
- m = c.receive(1000);
- assertNotNull(m);
-
- assertTrue(m.getJMSRedelivered());
- assertEquals(2, m.getIntProperty("JMSXDeliveryCount"));
+ // get the message again
+ m = c.receive(1000);
+ assertNotNull(m);
- conn.close();
+ assertTrue(m.getJMSRedelivered());
+ assertEquals(2, m.getIntProperty("JMSXDeliveryCount"));
- Integer i = getMessageCountForQueue("Queue1");
+ conn.close();
+ Integer i = getMessageCountForQueue("Queue1");
+
assertEquals(1, i.intValue());
}
finally
{
- if (conn != null)
- {
- conn.close();
- }
- removeAllMessages(queue1.getQueueName(), true);
+ if (conn != null)
+ {
+ conn.close();
+ }
+ removeAllMessages(queue1.getQueueName(), true);
}
}
@@ -107,11 +106,11 @@
TextMessage mRec1 = (TextMessage)consumer1.receive(2000);
assertNotNull(mRec1);
-
+
assertEquals("igloo", mRec1.getText());
assertFalse(mRec1.getJMSRedelivered());
- sess1.rollback(); //causes redelivery for session
+ sess1.rollback(); // causes redelivery for session
mRec1 = (TextMessage)consumer1.receive(2000);
assertEquals("igloo", mRec1.getText());
@@ -128,7 +127,6 @@
}
}
-
/** Test redelivery works ok for Topic */
public void testRedeliveredTopic() throws Exception
{
@@ -187,8 +185,10 @@
MessageConsumer consumer = sess.createConsumer(topic1);
conn.start();
+ log.info("sending message first time");
TextMessage mSent = sess.createTextMessage("igloo");
producer.send(mSent);
+ log.info("sent message first time");
sess.commit();
@@ -197,12 +197,15 @@
sess.commit();
+ log.info("sending message again");
mSent.setText("rollback");
producer.send(mSent);
+ log.info("sent message again");
sess.commit();
mRec = (TextMessage)consumer.receive(2000);
+ assertEquals("rollback", mRec.getText());
sess.rollback();
TextMessage mRec2 = (TextMessage)consumer.receive(2000);
@@ -243,7 +246,7 @@
final int NUM_MESSAGES = 10;
- //Send some messages
+ // Send some messages
for (int i = 0; i < NUM_MESSAGES; i++)
{
Message m = producerSess.createMessage();
@@ -283,7 +286,7 @@
final int NUM_MESSAGES = 10;
- //Send some messages
+ // Send some messages
for (int i = 0; i < NUM_MESSAGES; i++)
{
Message m = producerSess.createMessage();
@@ -296,7 +299,8 @@
while (true)
{
Message m = consumer.receive(500);
- if (m == null) break;
+ if (m == null)
+ break;
count++;
}
@@ -335,7 +339,7 @@
final int NUM_MESSAGES = 10;
- //Send some messages
+ // Send some messages
for (int i = 0; i < NUM_MESSAGES; i++)
{
Message m = producerSess.createMessage();
@@ -345,7 +349,8 @@
while (true)
{
Message m = consumer.receive(500);
- if (m == null) break;
+ if (m == null)
+ break;
count++;
}
@@ -378,8 +383,6 @@
}
-
-
/*
* Send some messages in a transacted session.
* Rollback the session.
@@ -402,7 +405,7 @@
final int NUM_MESSAGES = 10;
- //Send some messages
+ // Send some messages
for (int i = 0; i < NUM_MESSAGES; i++)
{
Message m = producerSess.createMessage();
@@ -503,7 +506,7 @@
assertEquals(1, tm.getIntProperty("JMSXDeliveryCount"));
sess.rollback();
-
+
sess.close();
Session sess2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -537,7 +540,6 @@
MessageConsumer consumer = sess.createConsumer(queue1);
conn.start();
-
TextMessage mSent = sess.createTextMessage("igloo");
producer.send(mSent);
log.trace("sent1");
@@ -560,7 +562,7 @@
log.trace("Receiving 2");
mRec = (TextMessage)consumer.receive(1000);
assertNotNull(mRec);
-
+
log.trace("Received 2");
assertNotNull(mRec);
assertEquals("rollback", mRec.getText());
@@ -599,7 +601,7 @@
final int NUM_MESSAGES = 10;
- //Send some messages
+ // Send some messages
for (int i = 0; i < NUM_MESSAGES; i++)
{
Message m = producerSess.createMessage();
@@ -617,9 +619,6 @@
}
}
-
-
-
/**
* Send some messages in transacted session. Commit.
* Verify message are received by consumer.
@@ -641,7 +640,7 @@
final int NUM_MESSAGES = 10;
- //Send some messages
+ // Send some messages
for (int i = 0; i < NUM_MESSAGES; i++)
{
Message m = producerSess.createMessage();
@@ -654,7 +653,8 @@
while (true)
{
Message m = consumer.receive(500);
- if (m == null) break;
+ if (m == null)
+ break;
count++;
}
@@ -671,7 +671,6 @@
}
-
/**
* Test IllegateStateException is thrown if commit is called on a non-transacted
session
*/
@@ -706,7 +705,6 @@
}
}
-
/**
* Send some messages.
* Receive them in a transacted session.
@@ -733,7 +731,7 @@
final int NUM_MESSAGES = 10;
- //Send some messages
+ // Send some messages
for (int i = 0; i < NUM_MESSAGES; i++)
{
Message m = producerSess.createMessage();
@@ -744,7 +742,8 @@
while (true)
{
Message m = consumer.receive(500);
- if (m == null) break;
+ if (m == null)
+ break;
count++;
}
@@ -752,9 +751,9 @@
conn.stop();
consumer.close();
-
+
conn.close();
-
+
conn = cf.createConnection();
consumerSess = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE);
@@ -762,15 +761,15 @@
conn.start();
count = 0;
-
while (true)
{
Message m = consumer.receive(500);
- if (m == null) break;
+ if (m == null)
+ break;
count++;
}
-
+
assertEquals(NUM_MESSAGES, count);
}
finally
@@ -783,9 +782,6 @@
}
}
-
-
-
/**
* Send some messages.
* Receive them in a transacted session.
@@ -810,7 +806,7 @@
final int NUM_MESSAGES = 10;
- //Send some messages
+ // Send some messages
for (int i = 0; i < NUM_MESSAGES; i++)
{
Message m = producerSess.createMessage();
@@ -821,7 +817,8 @@
while (true)
{
Message m = consumer.receive(500);
- if (m == null) break;
+ if (m == null)
+ break;
count++;
}
@@ -854,8 +851,6 @@
}
-
-
/*
* Send some messages in a transacted session.
* Rollback the session.
@@ -878,7 +873,7 @@
final int NUM_MESSAGES = 10;
- //Send some messages
+ // Send some messages
for (int i = 0; i < NUM_MESSAGES; i++)
{
Message m = producerSess.createMessage();
@@ -900,7 +895,6 @@
}
}
-
/*
* Test IllegateStateException is thrown if rollback is
* called on a non-transacted session
@@ -938,7 +932,6 @@
}
}
-
/*
* Send some messages.
* Receive them in a transacted session.
@@ -965,7 +958,7 @@
final int NUM_MESSAGES = 10;
- //Send some messages
+ // Send some messages
for (int i = 0; i < NUM_MESSAGES; i++)
{
Message m = producerSess.createMessage();
@@ -976,7 +969,8 @@
while (true)
{
Message m = consumer.receive(500);
- if (m == null) break;
+ if (m == null)
+ break;
count++;
}
@@ -999,7 +993,8 @@
while (true)
{
Message m = consumer.receive(500);
- if (m == null) break;
+ if (m == null)
+ break;
count++;
}
@@ -1017,7 +1012,6 @@
}
-
/*
* Send multiple messages in multiple contiguous sessions
*/
@@ -1039,7 +1033,7 @@
final int NUM_MESSAGES = 10;
final int NUM_TX = 10;
- //Send some messages
+ // Send some messages
for (int j = 0; j < NUM_TX; j++)
{
@@ -1056,7 +1050,8 @@
while (true)
{
Message m = consumer.receive(500);
- if (m == null) break;
+ if (m == null)
+ break;
count++;
m.acknowledge();
}
@@ -1080,5 +1075,3 @@
// Inner classes -------------------------------------------------
}
-
-
Modified:
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/AckBatchSizeTest.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/AckBatchSizeTest.java 2009-11-24
19:05:15 UTC (rev 8395)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/AckBatchSizeTest.java 2009-11-25
00:45:52 UTC (rev 8396)
@@ -78,21 +78,18 @@
cp.send(sendSession.createClientMessage(false));
}
- log.info("sent messages");
-
ClientConsumer consumer = session.createConsumer(queueA);
session.start();
for (int i = 0; i < numMessages - 1; i++)
{
ClientMessage m = consumer.receive(5000);
- log.info("got message " + i);
m.acknowledge();
}
ClientMessage m = consumer.receive(5000);
Queue q = (Queue)server.getPostOffice().getBinding(queueA).getBindable();
- assertEquals(1, q.getDeliveringCount());
+ assertEquals(100, q.getDeliveringCount());
m.acknowledge();
assertEquals(0, q.getDeliveringCount());
sendSession.close();
Modified:
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java 2009-11-24
19:05:15 UTC (rev 8395)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java 2009-11-25
00:45:52 UTC (rev 8396)
@@ -105,7 +105,7 @@
log.info("sent messages");
ClientConsumer consumer = session.createConsumer(QUEUE);
-
+
session.start();
for (int i = 0; i < numMessages; i++)
Modified:
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/DeadLetterAddressTest.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/DeadLetterAddressTest.java 2009-11-24
19:05:15 UTC (rev 8395)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/DeadLetterAddressTest.java 2009-11-25
00:45:52 UTC (rev 8396)
@@ -26,6 +26,7 @@
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.server.HornetQ;
import org.hornetq.core.server.HornetQServer;
@@ -39,6 +40,8 @@
*/
public class DeadLetterAddressTest extends UnitTestCase
{
+ private static final Logger log = Logger.getLogger(DeadLetterAddressTest.class);
+
private HornetQServer server;
private ClientSession clientSession;
@@ -236,7 +239,9 @@
for (int i = 0; i < deliveryAttempt; i++)
{
ClientMessage m = clientConsumer.receive(500);
- assertNotNull(m);
+ assertNotNull(m);
+ log.info("i is " + i);
+ log.info("delivery cout is " +m.getDeliveryCount());
assertEquals(i + 1, m.getDeliveryCount());
m.acknowledge();
clientSession.rollback();
Modified:
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/InVMNonPersistentMessageBufferTest.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/InVMNonPersistentMessageBufferTest.java 2009-11-24
19:05:15 UTC (rev 8395)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/InVMNonPersistentMessageBufferTest.java 2009-11-25
00:45:52 UTC (rev 8396)
@@ -85,12 +85,8 @@
int bodySize = message.getBodySize();
- log.info("body size is " + bodySize);
-
for (int i = 0; i < 10; i++)
{
- log.info("sending " + i);
-
ClientMessage received = sendAndReceive(message);
assertNotNull(received);
@@ -107,10 +103,12 @@
{
ClientMessage message = session.createClientMessage(false);
+ String body = RandomUtil.randomString();
+
for (int i = 0; i < 10; i++)
{
- log.info("iteration " + i);
- final String body = RandomUtil.randomString();
+ //Make the body a bit longer each time
+ body += "XX";
message.getBodyBuffer().writeString(body);
Modified:
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2009-11-24
19:05:15 UTC (rev 8395)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2009-11-25
00:45:52 UTC (rev 8396)
@@ -23,7 +23,6 @@
import junit.framework.TestSuite;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
import org.hornetq.core.client.ClientConsumer;
import org.hornetq.core.client.ClientMessage;
import org.hornetq.core.client.ClientProducer;
@@ -41,7 +40,6 @@
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.tests.integration.largemessage.LargeMessageTestBase;
import org.hornetq.tests.util.RandomUtil;
-import org.hornetq.utils.DataConstants;
import org.hornetq.utils.SimpleString;
/**
Modified:
branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java 2009-11-24
19:05:15 UTC (rev 8395)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java 2009-11-25
00:45:52 UTC (rev 8396)
@@ -64,8 +64,6 @@
startServers();
- log.info("********** started servers");
-
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());
@@ -110,8 +108,6 @@
this.closeAllSessionFactories();
- log.info("** stopping servers");
-
stopServers(0, 1, 2, 3, 4);
startServers();
@@ -167,33 +163,24 @@
startServers();
- log.info("*** started servers");
-
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());
setupSessionFactory(3, isNetty());
setupSessionFactory(4, isNetty());
- log.info("** created session factories");
-
- createQueue(0, "queues.testaddress", "queue0", null, false);
-
+ createQueue(0, "queues.testaddress", "queue0", null, false);
createQueue(1, "queues.testaddress", "queue0", null, false);
createQueue(2, "queues.testaddress", "queue0", null, false);
createQueue(3, "queues.testaddress", "queue0", null, false);
createQueue(4, "queues.testaddress", "queue0", null, false);
- log.info("**** created queues");
-
addConsumer(0, 0, "queue0", null);
addConsumer(1, 1, "queue0", null);
addConsumer(2, 2, "queue0", null);
addConsumer(3, 3, "queue0", null);
addConsumer(4, 4, "queue0", null);
- log.info("*** created consumers");
-
waitForBindings(0, "queues.testaddress", 1, 1, true);
waitForBindings(1, "queues.testaddress", 1, 1, true);
waitForBindings(2, "queues.testaddress", 1, 1, true);
@@ -206,14 +193,8 @@
waitForBindings(3, "queues.testaddress", 4, 4, false);
waitForBindings(4, "queues.testaddress", 4, 4, false);
- log.info("** sending messages");
-
send(0, "queues.testaddress", 10, false, null);
- log.info("** sent messages");
-
- // this.checkReceive(0, 1, 2, 3, 4);
-
verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2, 3, 4);
this.verifyNotReceive(0, 1, 2, 3, 4);
@@ -1181,6 +1162,10 @@
waitForBindings(2, "queues.testaddress", 4, 4, false);
waitForBindings(3, "queues.testaddress", 4, 4, false);
waitForBindings(4, "queues.testaddress", 4, 4, false);
+
+ // this.checkReceive(0, 1, 2, 3, 4);
+
+ //Thread.sleep(300000);
verifyReceiveAll(10, 0, 1, 2, 3, 4);
}
@@ -1436,12 +1421,9 @@
closeSessionFactory(3);
stopServers(0, 3);
- log.info("stopped servers");
startServers(3, 0);
-
- log.info("restarted servers");
-
+
Thread.sleep(2000);
setupSessionFactory(0, isNetty());
Modified:
branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTest.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTest.java 2009-11-24
19:05:15 UTC (rev 8395)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTest.java 2009-11-25
00:45:52 UTC (rev 8396)
@@ -17,6 +17,7 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQ;
@@ -30,6 +31,8 @@
*/
public class MultiThreadRandomReattachTest extends MultiThreadRandomReattachTestBase
{
+ private static final Logger log =
Logger.getLogger(MultiThreadRandomReattachTest.class);
+
@Override
protected void start() throws Exception
{
@@ -47,6 +50,8 @@
@Override
protected void setBody(final ClientMessage message) throws Exception
{
+ //Give each msg a body
+ message.getBodyBuffer().writeBytes(new byte[500]);
}
/* (non-Javadoc)
@@ -55,7 +60,7 @@
@Override
protected boolean checkSize(final ClientMessage message)
{
- return 0 == message.getBodyBuffer().writerIndex();
+ return message.getBodyBuffer().readableBytes() == 500;
}
}
Modified:
branches/20-optimisation/tests/src/org/hornetq/tests/performance/persistence/StorageManagerTimingTest.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/performance/persistence/StorageManagerTimingTest.java 2009-11-24
19:05:15 UTC (rev 8395)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/performance/persistence/StorageManagerTimingTest.java 2009-11-25
00:45:52 UTC (rev 8396)
@@ -167,12 +167,12 @@
final SimpleString address = new SimpleString("Destination "
+ i);
- ServerMessageImpl implMsg = new ServerMessageImpl();
+ ServerMessageImpl implMsg = new ServerMessageImpl(i, 1000);
implMsg.putStringProperty(new SimpleString("Key"), new
SimpleString("This String is worthless!"));
implMsg.setMessageID(i);
- implMsg.setBuffer(HornetQChannelBuffers.wrappedBuffer(bytes));
+ implMsg.getBodyBuffer().writeBytes(bytes);
implMsg.setDestination(address);
Modified:
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java 2009-11-24
19:05:15 UTC (rev 8395)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java 2009-11-25
00:45:52 UTC (rev 8396)
@@ -25,10 +25,12 @@
import org.hornetq.core.paging.impl.PageImpl;
import org.hornetq.core.paging.impl.PagedMessageImpl;
import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
+import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.DataConstants;
import org.hornetq.utils.SimpleString;
/**
@@ -207,9 +209,9 @@
for (int i = 0; i < numberOfElements; i++)
{
- ServerMessage msg = new ServerMessageImpl(i, 1000);
+ ServerMessage msg = new ServerMessageImpl(i, 100);
- for (int j = 0; j < msg.getBodyBuffer().capacity(); j++)
+ for (int j = 0; j < 10; j++)
{
msg.getBodyBuffer().writeByte((byte)'b');
}
@@ -225,8 +227,6 @@
return buffers;
}
-
-
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -238,5 +238,4 @@
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
-
}
Modified:
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2009-11-24
19:05:15 UTC (rev 8395)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2009-11-25
00:45:52 UTC (rev 8396)
@@ -286,6 +286,18 @@
class FakeMessage implements ServerMessage
{
+ public void decode(HornetQBuffer buffer)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void encode(HornetQBuffer buffer)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
public ServerMessage copy() throws Exception
{
// TODO Auto-generated method stub
@@ -322,12 +334,6 @@
}
- public int getEndMessagePosition()
- {
- // TODO Auto-generated method stub
- return 0;
- }
-
public int getMemoryEstimate()
{
// TODO Auto-generated method stub
@@ -376,43 +382,43 @@
return false;
}
- public void setEndMessagePosition(int pos)
+ public void setMessageID(long id)
{
// TODO Auto-generated method stub
}
- public void setMessageID(long id)
+ public void setOriginalHeaders(ServerMessage other, boolean expiry)
{
// TODO Auto-generated method stub
}
- public void setNeedsEncoding()
+ public void setPagingStore(PagingStore store)
{
// TODO Auto-generated method stub
}
- public void setOriginalHeaders(ServerMessage other, boolean expiry)
+ public boolean storeIsPaging()
{
// TODO Auto-generated method stub
-
+ return false;
}
- public void setPagingStore(PagingStore store)
+ public void bodyChanged()
{
// TODO Auto-generated method stub
}
- public boolean storeIsPaging()
+ public void checkCopy()
{
// TODO Auto-generated method stub
- return false;
+
}
- public void afterSend()
+ public void clearCopied()
{
// TODO Auto-generated method stub
@@ -430,7 +436,7 @@
return false;
}
- public void decodeFromWire(HornetQBuffer buffer)
+ public void decodeFromBuffer(HornetQBuffer buffer)
{
// TODO Auto-generated method stub
@@ -448,6 +454,12 @@
}
+ public HornetQBuffer encodeToBuffer()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
public HornetQBuffer getBodyBuffer()
{
// TODO Auto-generated method stub
@@ -526,6 +538,18 @@
return 0;
}
+ public int getEndOfBodyPosition()
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ public int getEndOfMessagePosition()
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
public long getExpiration()
{
// TODO Auto-generated method stub
@@ -670,24 +694,12 @@
return null;
}
- public boolean isBufferWritten()
- {
- // TODO Auto-generated method stub
- return false;
- }
-
public boolean isDurable()
{
// TODO Auto-generated method stub
return false;
}
- public boolean isEncodedToBuffer()
- {
- // TODO Auto-generated method stub
- return false;
- }
-
public boolean isExpired()
{
// TODO Auto-generated method stub
@@ -838,12 +850,6 @@
return null;
}
- public void setBuffer(HornetQBuffer buffer)
- {
- // TODO Auto-generated method stub
-
- }
-
public void setDestination(SimpleString destination)
{
// TODO Auto-generated method stub
@@ -880,91 +886,20 @@
return null;
}
- public void decode(HornetQBuffer buffer)
+ public void resetCopied()
{
// TODO Auto-generated method stub
}
- public void encode(HornetQBuffer buffer)
+ public void setEndOfBodyPosition()
{
// TODO Auto-generated method stub
}
- public void setEncodedToBuffer(boolean encoded)
- {
- // TODO Auto-generated method stub
-
- }
- public void decodeFromBuffer(HornetQBuffer buffer)
- {
- // TODO Auto-generated method stub
-
- }
- public HornetQBuffer encodeToBuffer()
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- public int getBodySize()
- {
- // TODO Auto-generated method stub
- return 0;
- }
-
- public int getEndOfMessagePosition()
- {
- // TODO Auto-generated method stub
- return 0;
- }
-
- public void saveToOutputStream(OutputStream out) throws HornetQException
- {
- // TODO Auto-generated method stub
-
- }
-
- public void setBodyInputStream(InputStream bodyInputStream)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void setOutputStream(OutputStream out) throws HornetQException
- {
- // TODO Auto-generated method stub
-
- }
-
- public boolean waitOutputStreamCompletion(long timeMilliseconds) throws
HornetQException
- {
- // TODO Auto-generated method stub
- return false;
- }
-
- public void beforeDeliver()
- {
- // TODO Auto-generated method stub
-
- }
-
- public void beforeSend()
- {
- // TODO Auto-generated method stub
-
- }
-
- public void forceCopy()
- {
- // TODO Auto-generated method stub
-
- }
-
-
}
class FakeFilter implements Filter