JBoss hornetq SVN: r8401 - in branches/20-optimisation: src/main/org/hornetq/core/client/impl and 31 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-11-25 06:07:36 -0500 (Wed, 25 Nov 2009)
New Revision: 8401
Added:
branches/20-optimisation/src/main/org/hornetq/core/buffers/HornetQBuffers.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/jms/client/ReSendLargeMessageTest.java
Removed:
branches/20-optimisation/src/main/org/hornetq/core/buffers/HornetQChannelBuffers.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/jms/client/ResendTest.java
Modified:
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
branches/20-optimisation/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
branches/20-optimisation/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
branches/20-optimisation/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
branches/20-optimisation/src/main/org/hornetq/core/journal/impl/JournalImpl.java
branches/20-optimisation/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java
branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PageImpl.java
branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java
branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/20-optimisation/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java
branches/20-optimisation/src/main/org/hornetq/core/server/ServerMessage.java
branches/20-optimisation/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/20-optimisation/src/main/org/hornetq/core/server/cluster/impl/BroadcastGroupImpl.java
branches/20-optimisation/src/main/org/hornetq/core/server/impl/DivertImpl.java
branches/20-optimisation/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
branches/20-optimisation/src/main/org/hornetq/jms/client/HornetQMessage.java
branches/20-optimisation/src/main/org/hornetq/jms/client/HornetQStreamMessage.java
branches/20-optimisation/src/main/org/hornetq/utils/TypedProperties.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/PagingTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/management/ManagementHelperTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/management/ManagementServiceImplTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/performance/persistence/StorageManagerTimingTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/stress/journal/CompactingStressTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/stress/journal/LargeJournalStressTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/stress/journal/MultiThreadConsumerStressTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/stress/journal/NIOMultiThreadCompactorStressTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/timing/util/UTF8Test.java
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/filter/impl/FilterTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/ChannelBufferWrapper2Test.java
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/unit/util/TypedPropertiesTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/unit/util/UTF8Test.java
branches/20-optimisation/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
optimisation
Copied: branches/20-optimisation/src/main/org/hornetq/core/buffers/HornetQBuffers.java (from rev 8387, branches/20-optimisation/src/main/org/hornetq/core/buffers/HornetQChannelBuffers.java)
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/buffers/HornetQBuffers.java (rev 0)
+++ branches/20-optimisation/src/main/org/hornetq/core/buffers/HornetQBuffers.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.buffers;
+
+import java.nio.ByteBuffer;
+
+import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+
+/**
+ *
+ * A HornetQChannelBuffers
+ *
+ * @author tim
+ *
+ *
+ */
+public class HornetQBuffers
+{
+ public static HornetQBuffer dynamicBuffer(int estimatedLength)
+ {
+ return new ChannelBufferWrapper(ChannelBuffers.dynamicBuffer(estimatedLength));
+ }
+
+ public static HornetQBuffer dynamicBuffer(byte[] bytes)
+ {
+ ChannelBuffer buff = ChannelBuffers.dynamicBuffer(bytes.length);
+
+ buff.writeBytes(bytes);
+
+ return new ChannelBufferWrapper(buff);
+ }
+
+ public static HornetQBuffer wrappedBuffer(ByteBuffer underlying)
+ {
+ HornetQBuffer buff = new ChannelBufferWrapper(ChannelBuffers.wrappedBuffer(underlying));
+
+ buff.clear();
+
+ return buff;
+ }
+
+ public static HornetQBuffer wrappedBuffer(byte[] underlying)
+ {
+ return new ChannelBufferWrapper(ChannelBuffers.wrappedBuffer(underlying));
+ }
+
+ public static HornetQBuffer fixedBuffer(int size)
+ {
+ return new ChannelBufferWrapper(ChannelBuffers.buffer(size));
+ }
+}
Deleted: branches/20-optimisation/src/main/org/hornetq/core/buffers/HornetQChannelBuffers.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/buffers/HornetQChannelBuffers.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/src/main/org/hornetq/core/buffers/HornetQChannelBuffers.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -1,61 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.buffers;
-
-import java.nio.ByteBuffer;
-
-import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-
-
-/**
- *
- * A HornetQChannelBuffers
- *
- * @author tim
- *
- *
- */
-public class HornetQChannelBuffers
-{
- public static HornetQBuffer dynamicBuffer(int estimatedLength)
- {
- return new ChannelBufferWrapper(ChannelBuffers.dynamicBuffer(estimatedLength));
- }
-
- public static HornetQBuffer dynamicBuffer(byte[] bytes)
- {
- ChannelBuffer buff = ChannelBuffers.dynamicBuffer(bytes.length);
-
- buff.writeBytes(bytes);
-
- return new ChannelBufferWrapper(buff);
- }
-
- public static HornetQBuffer wrappedBuffer(ByteBuffer underlying)
- {
- return new ChannelBufferWrapper(ChannelBuffers.wrappedBuffer(underlying));
- }
-
- public static HornetQBuffer wrappedBuffer(byte[] underlying)
- {
- return new ChannelBufferWrapper(ChannelBuffers.wrappedBuffer(underlying));
- }
-
- public static HornetQBuffer fixedBuffer(int size)
- {
- return new ChannelBufferWrapper(ChannelBuffers.buffer(size));
- }
-}
Modified: branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -17,7 +17,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.client.ClientMessage;
import org.hornetq.core.client.MessageHandler;
import org.hornetq.core.exception.HornetQException;
@@ -491,7 +491,7 @@
//FIXME - this is really inefficient - decoding from a buffer to a byte[] then from the byte[] to another buffer
//which is then decoded to form the message! Clebert, what were you thinking?
- currentChunkMessage.decodeHeadersAndProperties(HornetQChannelBuffers.wrappedBuffer(packet.getLargeMessageHeader()));
+ currentChunkMessage.decodeHeadersAndProperties(HornetQBuffers.wrappedBuffer(packet.getLargeMessageHeader()));
currentChunkMessage.setLargeMessage(true);
Modified: branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -19,7 +19,7 @@
import java.io.InputStream;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.BodyEncoder;
@@ -308,7 +308,7 @@
msg.getWholeBuffer().readerIndex(0);
}
- HornetQBuffer headerBuffer = HornetQChannelBuffers.fixedBuffer(headerSize);
+ HornetQBuffer headerBuffer = HornetQBuffers.fixedBuffer(headerSize);
msg.encodeHeadersAndProperties(headerBuffer);
@@ -359,7 +359,7 @@
final int chunkLength = Math.min((int)(bodySize - pos), minLargeMessageSize);
- final HornetQBuffer bodyBuffer = HornetQChannelBuffers.fixedBuffer(chunkLength);
+ final HornetQBuffer bodyBuffer = HornetQBuffers.fixedBuffer(chunkLength);
context.encode(bodyBuffer, chunkLength);
Modified: branches/20-optimisation/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -24,7 +24,7 @@
import java.util.Map;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.cluster.DiscoveryEntry;
import org.hornetq.core.cluster.DiscoveryGroup;
import org.hornetq.core.cluster.DiscoveryListener;
@@ -328,7 +328,7 @@
}
}
- HornetQBuffer buffer = HornetQChannelBuffers.wrappedBuffer(data);
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(data);
String originatingNodeID = buffer.readString();
Modified: branches/20-optimisation/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -19,7 +19,7 @@
import java.util.Set;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.logging.Logger;
@@ -97,12 +97,12 @@
{
controlFile.open(1);
- HornetQBuffer renameBuffer = HornetQChannelBuffers.dynamicBuffer(1);
+ HornetQBuffer renameBuffer = HornetQBuffers.dynamicBuffer(1);
renameBuffer.writeInt(-1);
renameBuffer.writeInt(-1);
- HornetQBuffer filesToRename = HornetQChannelBuffers.dynamicBuffer(1);
+ HornetQBuffer filesToRename = HornetQBuffers.dynamicBuffer(1);
// DataFiles first
@@ -205,8 +205,8 @@
flush();
ByteBuffer bufferWrite = fileFactory.newBuffer(journal.getFileSize());
- writingChannel = HornetQChannelBuffers.wrappedBuffer(bufferWrite);
-
+ writingChannel = HornetQBuffers.wrappedBuffer(bufferWrite);
+
currentFile = journal.getFile(false, false, false, true);
sequentialFile = currentFile.getFile();
Modified: branches/20-optimisation/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -22,7 +22,7 @@
import java.util.concurrent.ConcurrentHashMap;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.journal.RecordInfo;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
@@ -83,7 +83,7 @@
}
else
{
- HornetQBuffer input = HornetQChannelBuffers.wrappedBuffer(records.get(0).data);
+ HornetQBuffer input = HornetQBuffers.wrappedBuffer(records.get(0).data);
int numberDataFiles = input.readInt();
Modified: branches/20-optimisation/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -42,7 +42,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.journal.JournalLoadInformation;
@@ -3331,7 +3331,7 @@
private HornetQBuffer newBuffer(final int size)
{
- return HornetQChannelBuffers.fixedBuffer(size);
+ return HornetQBuffers.fixedBuffer(size);
}
// Inner classes
Modified: branches/20-optimisation/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -22,7 +22,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.logging.Logger;
import org.hornetq.utils.VariableLatch;
@@ -104,7 +104,7 @@
}
// Setting the interval for nano-sleeps
- buffer = HornetQChannelBuffers.fixedBuffer(bufferSize);
+ buffer = HornetQBuffers.fixedBuffer(bufferSize);
buffer.clear();
bufferLimit = 0;
Modified: branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -24,7 +24,7 @@
import java.util.Set;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.buffers.impl.ResetLimitWrappedHornetQBuffer;
import org.hornetq.core.client.LargeMessageBuffer;
import org.hornetq.core.exception.HornetQException;
@@ -96,8 +96,6 @@
protected HornetQBuffer buffer;
- // private int encodeSize;
-
// Constructors --------------------------------------------------
protected MessageImpl()
@@ -139,7 +137,7 @@
private void createBody(final int initialMessageBufferSize)
{
- buffer = HornetQChannelBuffers.dynamicBuffer(initialMessageBufferSize);
+ buffer = HornetQBuffers.dynamicBuffer(initialMessageBufferSize);
// There's a bug in netty which means a dynamic buffer won't resize until you write a byte
buffer.writeByte((byte)0);
@@ -148,17 +146,17 @@
buffer.setIndex(limit, limit);
- //endOfBodyPosition = limit;
+ // endOfBodyPosition = limit;
}
// Message implementation ----------------------------------------
public int getEncodeSize()
- {
- int headersPropsSize = this.getHeadersAndPropertiesEncodeSize();
-
+ {
+ int headersPropsSize = this.getHeadersAndPropertiesEncodeSize();
+
int bodyPos = this.endOfBodyPosition == -1 ? buffer.writerIndex() : this.endOfBodyPosition;
-
+
int bodySize = bodyPos - PacketImpl.PACKET_HEADERS_SIZE - DataConstants.SIZE_INT;
return SIZE_INT + bodySize + SIZE_INT + headersPropsSize;
@@ -176,27 +174,27 @@
}
public void encodeHeadersAndProperties(final HornetQBuffer buffer)
- {
+ {
buffer.writeLong(messageID);
- buffer.writeSimpleString(destination);
+ buffer.writeSimpleString(destination);
buffer.writeByte(type);
buffer.writeBoolean(durable);
buffer.writeLong(expiration);
buffer.writeLong(timestamp);
- buffer.writeByte(priority);
+ buffer.writeByte(priority);
properties.encode(buffer);
}
public void decodeHeadersAndProperties(final HornetQBuffer buffer)
{
- messageID = buffer.readLong();
- destination = buffer.readSimpleString();
+ messageID = buffer.readLong();
+ destination = buffer.readSimpleString();
type = buffer.readByte();
durable = buffer.readBoolean();
expiration = buffer.readLong();
timestamp = buffer.readLong();
- priority = buffer.readByte();
- properties.decode(buffer);
+ priority = buffer.readByte();
+ properties.decode(buffer);
}
public long getMessageID()
@@ -679,7 +677,7 @@
public int encode(ByteBuffer bufferRead) throws HornetQException
{
- HornetQBuffer buffer = HornetQChannelBuffers.wrappedBuffer(bufferRead);
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bufferRead);
return encode(buffer, bufferRead.capacity());
}
@@ -691,7 +689,6 @@
}
}
-
protected ResetLimitWrappedHornetQBuffer bodyBuffer;
public HornetQBuffer getBodyBuffer()
@@ -724,8 +721,8 @@
/*
* Copy constructor
*/
- protected MessageImpl(final MessageImpl other)
- {
+ protected MessageImpl(final MessageImpl other, final boolean shallow)
+ {
messageID = other.getMessageID();
destination = other.getDestination();
type = other.getType();
@@ -734,12 +731,23 @@
timestamp = other.getTimestamp();
priority = other.getPriority();
properties = new TypedProperties(other.getProperties());
- // Note, this is a shallow copy - does not copy the buffer
- buffer = other.buffer;
+
this.bufferValid = other.bufferValid;
this.endOfBodyPosition = other.endOfBodyPosition;
this.endOfMessagePosition = other.endOfMessagePosition;
this.copied = other.copied;
+
+ if (shallow)
+ {
+ this.buffer = other.buffer;
+ }
+ else
+ {
+ // We need to copy the underlying buffer too, since the different messsages thereafter might have different
+ // properties set on them, making their encoding different
+ buffer = other.buffer.copy(0, other.buffer.capacity());
+ buffer.setIndex(other.buffer.readerIndex(), other.buffer.writerIndex());
+ }
}
public void bodyChanged()
@@ -749,7 +757,7 @@
checkCopy();
bufferValid = false;
-
+
this.endOfBodyPosition = -1;
}
@@ -762,7 +770,7 @@
copied = true;
}
}
-
+
public void resetCopied()
{
copied = false;
@@ -771,17 +779,17 @@
private void forceCopy()
{
// Must copy buffer before sending it
-
+
buffer = buffer.copy(0, buffer.capacity());
-
+
buffer.setIndex(0, this.endOfBodyPosition);
-
+
if (bodyBuffer != null)
{
bodyBuffer.setBuffer(buffer);
}
}
-
+
public int getEndOfMessagePosition()
{
return this.endOfMessagePosition;
@@ -791,12 +799,12 @@
{
return this.endOfBodyPosition;
}
-
+
// Encode to journal or paging
public void encode(HornetQBuffer buff)
{
encodeToBuffer();
-
+
buff.writeBytes(buffer, PacketImpl.PACKET_HEADERS_SIZE, endOfMessagePosition - PacketImpl.PACKET_HEADERS_SIZE);
}
@@ -820,18 +828,18 @@
buff.readerIndex(start + length);
}
- //This must be synchronized as it can be called concurrently id the message is being delivered concurently to
- //many queues - the first caller in this case will actually encode it
+ // This must be synchronized as it can be called concurrently id the message is being delivered concurently to
+ // many queues - the first caller in this case will actually encode it
public synchronized HornetQBuffer encodeToBuffer()
- {
+ {
if (!bufferValid)
- {
+ {
if (endOfBodyPosition == -1)
{
// Means sending message for first time
- endOfBodyPosition = buffer.writerIndex();
+ endOfBodyPosition = buffer.writerIndex();
}
-
+
// write it
buffer.setInt(PacketImpl.PACKET_HEADERS_SIZE, endOfBodyPosition);
@@ -848,7 +856,7 @@
this.bufferValid = true;
}
-
+
return buffer;
}
Modified: branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PageImpl.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PageImpl.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -22,7 +22,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.logging.Logger;
@@ -98,7 +98,7 @@
buffer2.rewind();
- HornetQBuffer fileBuffer = HornetQChannelBuffers.wrappedBuffer(buffer2);
+ HornetQBuffer fileBuffer = HornetQBuffers.wrappedBuffer(buffer2);
fileBuffer.writerIndex(fileBuffer.capacity());
while (fileBuffer.readable())
@@ -149,7 +149,7 @@
{
ByteBuffer buffer = fileFactory.newBuffer(message.getEncodeSize() + SIZE_RECORD);
- HornetQBuffer wrap = HornetQChannelBuffers.wrappedBuffer(buffer);
+ HornetQBuffer wrap = HornetQBuffers.wrappedBuffer(buffer);
wrap.clear();
wrap.writeByte(START_BYTE);
Modified: branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -18,7 +18,7 @@
import static org.hornetq.utils.DataConstants.SIZE_LONG;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.persistence.StorageManager;
@@ -76,7 +76,7 @@
if (largeMessageLazyData != null)
{
message = storage.createLargeMessage();
- HornetQBuffer buffer = HornetQChannelBuffers.dynamicBuffer(largeMessageLazyData);
+ HornetQBuffer buffer = HornetQBuffers.dynamicBuffer(largeMessageLazyData);
message.decodeHeadersAndProperties(buffer);
largeMessageLazyData = null;
}
Modified: branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -33,7 +33,7 @@
import javax.transaction.xa.Xid;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.filter.Filter;
@@ -432,7 +432,7 @@
LargeServerMessageImpl largeMessage = (LargeServerMessageImpl)createLargeMessage();
- HornetQBuffer headerBuffer = HornetQChannelBuffers.wrappedBuffer(header);
+ HornetQBuffer headerBuffer = HornetQBuffers.wrappedBuffer(header);
largeMessage.decodeHeadersAndProperties(headerBuffer);
@@ -680,7 +680,7 @@
{
byte[] data = record.data;
- HornetQBuffer buff = HornetQChannelBuffers.wrappedBuffer(data);
+ HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
try
{
@@ -721,7 +721,7 @@
{
byte[] data = record.data;
- HornetQBuffer buff = HornetQChannelBuffers.wrappedBuffer(data);
+ HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
byte recordType = record.getUserRecordType();
@@ -1008,7 +1008,7 @@
{
byte[] data = record.data;
- HornetQBuffer buff = HornetQChannelBuffers.wrappedBuffer(data);
+ HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
byte recordType = record.getUserRecordType();
@@ -1140,7 +1140,7 @@
{
byte[] data = record.data;
- HornetQBuffer buff = HornetQChannelBuffers.wrappedBuffer(data);
+ HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
long messageID = record.id;
@@ -1224,7 +1224,7 @@
{
long id = record.id;
- HornetQBuffer buffer = HornetQChannelBuffers.wrappedBuffer(record.data);
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(record.data);
byte rec = record.getUserRecordType();
@@ -1441,7 +1441,7 @@
XidEncoding(final byte[] data)
{
- xid = XidCodecSupport.decodeXid(HornetQChannelBuffers.wrappedBuffer(data));
+ xid = XidCodecSupport.decodeXid(HornetQBuffers.wrappedBuffer(data));
}
public void decode(final HornetQBuffer buffer)
Modified: branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -14,7 +14,7 @@
package org.hornetq.core.persistence.impl.nullpm;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.impl.ServerMessageImpl;
Modified: branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -20,7 +20,7 @@
import javax.transaction.xa.Xid;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
@@ -196,7 +196,7 @@
{
NullStorageLargeServerMessage largeMessage = new NullStorageLargeServerMessage();
- HornetQBuffer headerBuffer = HornetQChannelBuffers.wrappedBuffer(header);
+ HornetQBuffer headerBuffer = HornetQBuffers.wrappedBuffer(header);
largeMessage.decodeHeadersAndProperties(headerBuffer);
Modified: branches/20-optimisation/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -25,7 +25,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.client.management.impl.ManagementHelper;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.filter.Filter;
Modified: branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -16,7 +16,7 @@
import java.util.concurrent.RejectedExecutionException;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.spi.BufferHandler;
@@ -97,7 +97,7 @@
public HornetQBuffer createBuffer(final int size)
{
- return HornetQChannelBuffers.dynamicBuffer(size);
+ return HornetQBuffers.dynamicBuffer(size);
}
public Object getID()
Modified: branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -43,13 +43,6 @@
this.consumerID = consumerID;
this.deliveryCount = deliveryCount;
-
- //If the message hasn't already been copied when the headers/properties/body was changed since last send
- //(which will prompt an invalidate(), which will cause a copy if not copied already)
- //Then the message needs to be copied before delivering - the previous send may be in the Netty write queue
- //so we can't just use the same buffer. Also we can't just duplicate, since the extra data (consumerID, deliveryCount)
- //may well be different on different calls
- //message.forceCopy();
}
public SessionReceiveMessage()
@@ -76,8 +69,6 @@
@Override
public HornetQBuffer encode(final RemotingConnection connection)
{
- //message.setEndOfBodyPosition();
-
HornetQBuffer orig = message.encodeToBuffer();
//Now we must copy this buffer, before sending to Netty, as it could be concurrently delivered to many consumers
@@ -85,7 +76,7 @@
HornetQBuffer buffer = orig.copy(0, orig.capacity());
buffer.setIndex(0, message.getEndOfMessagePosition());
-
+
buffer.writeLong(consumerID);
buffer.writeInt(deliveryCount);
Modified: branches/20-optimisation/src/main/org/hornetq/core/server/ServerMessage.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/ServerMessage.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/ServerMessage.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -39,9 +39,11 @@
int decrementDurableRefCount();
- ServerMessage copy(long newID) throws Exception;
+ ServerMessage copy(long newID);
- ServerMessage copy() throws Exception;
+ ServerMessage copy();
+
+ ServerMessage shallowCopy();
int getMemoryEstimate();
Modified: branches/20-optimisation/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -416,7 +416,7 @@
// the one pertinent for the destination node - this is important since different queues on different
// nodes could have same queue ids
// Note we must copy since same message may get routed to other nodes which require different headers
- message = message.copy();
+ message = message.shallowCopy();
// TODO - we can optimise this
Modified: branches/20-optimisation/src/main/org/hornetq/core/server/cluster/impl/BroadcastGroupImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/cluster/impl/BroadcastGroupImpl.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/cluster/impl/BroadcastGroupImpl.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -21,7 +21,7 @@
import java.util.concurrent.ScheduledFuture;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.management.Notification;
@@ -203,7 +203,7 @@
return;
}
- HornetQBuffer buff = HornetQChannelBuffers.dynamicBuffer(4096);
+ HornetQBuffer buff = HornetQBuffers.dynamicBuffer(4096);
buff.writeString(nodeID);
Modified: branches/20-optimisation/src/main/org/hornetq/core/server/impl/DivertImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/impl/DivertImpl.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/impl/DivertImpl.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -83,6 +83,8 @@
// TODO we can optimise this so it doesn't copy if it's not routed anywhere else
+ log.info("making copy for divert");
+
ServerMessage copy = message.copy();
copy.setDestination(forwardAddress);
Modified: branches/20-optimisation/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -31,7 +31,7 @@
import javax.management.MBeanServer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.client.ClientSessionFactory;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.client.impl.FailoverManager;
Modified: branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -22,7 +22,7 @@
import java.util.concurrent.locks.ReentrantLock;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.client.impl.ClientConsumerImpl;
import org.hornetq.core.client.management.impl.ManagementHelper;
import org.hornetq.core.exception.HornetQException;
@@ -676,7 +676,7 @@
if (!sentInitialPacket)
{
- HornetQBuffer headerBuffer = HornetQChannelBuffers.fixedBuffer(largeMessage.getHeadersAndPropertiesEncodeSize());
+ HornetQBuffer headerBuffer = HornetQBuffers.fixedBuffer(largeMessage.getHeadersAndPropertiesEncodeSize());
largeMessage.encodeHeadersAndProperties(headerBuffer);
@@ -805,7 +805,7 @@
localChunkLen = (int)Math.min(sizePendingLargeMessage - positionPendingLargeMessage, minLargeMessageSize);
- HornetQBuffer bodyBuffer = HornetQChannelBuffers.fixedBuffer(localChunkLen);
+ HornetQBuffer bodyBuffer = HornetQBuffers.fixedBuffer(localChunkLen);
context.encode(bodyBuffer, localChunkLen);
Modified: branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -68,9 +68,9 @@
/*
* Copy constructor
*/
- protected ServerMessageImpl(final ServerMessageImpl other)
+ protected ServerMessageImpl(final ServerMessageImpl other, final boolean shallow)
{
- super(other);
+ super(other, shallow);
}
public void setMessageID(final long id)
@@ -162,19 +162,24 @@
return memoryEstimate;
}
- public ServerMessage copy(final long newID) throws Exception
+ public ServerMessage copy(final long newID)
{
- ServerMessage m = new ServerMessageImpl(this);
+ ServerMessage m = new ServerMessageImpl(this, false);
m.setMessageID(newID);
return m;
}
- public ServerMessage copy() throws Exception
+ public ServerMessage copy()
{
- return new ServerMessageImpl(this);
+ return new ServerMessageImpl(this, false);
}
+
+ public ServerMessage shallowCopy()
+ {
+ return new ServerMessageImpl(this, true);
+ }
public ServerMessage makeCopyForExpiryOrDLA(final long newID, final boolean expiry) throws Exception
{
Modified: branches/20-optimisation/src/main/org/hornetq/jms/client/HornetQMessage.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/jms/client/HornetQMessage.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/src/main/org/hornetq/jms/client/HornetQMessage.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -34,7 +34,7 @@
import javax.jms.MessageNotWriteableException;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.client.ClientMessage;
import org.hornetq.core.client.ClientSession;
import org.hornetq.core.client.impl.ClientMessageImpl;
Modified: branches/20-optimisation/src/main/org/hornetq/jms/client/HornetQStreamMessage.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/jms/client/HornetQStreamMessage.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/src/main/org/hornetq/jms/client/HornetQStreamMessage.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -19,7 +19,7 @@
import javax.jms.StreamMessage;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.client.ClientMessage;
import org.hornetq.core.client.ClientSession;
import org.hornetq.core.client.impl.ClientMessageImpl;
Modified: branches/20-optimisation/src/main/org/hornetq/utils/TypedProperties.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/utils/TypedProperties.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/src/main/org/hornetq/utils/TypedProperties.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -454,7 +454,7 @@
for (int i = 0; i < numHeaders; i++)
{
- int len = buffer.readInt();
+ int len = buffer.readInt();
byte[] data = new byte[len];
buffer.readBytes(data);
SimpleString key = new SimpleString(data);
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -20,7 +20,7 @@
import junit.framework.AssertionFailedError;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.client.ClientConsumer;
import org.hornetq.core.client.ClientMessage;
import org.hornetq.core.client.ClientProducer;
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -20,7 +20,7 @@
import java.util.List;
import java.util.Map;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.client.ClientConsumer;
import org.hornetq.core.client.ClientMessage;
import org.hornetq.core.client.ClientProducer;
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -17,7 +17,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.client.ClientConsumer;
import org.hornetq.core.client.ClientMessage;
import org.hornetq.core.client.ClientProducer;
Copied: branches/20-optimisation/tests/src/org/hornetq/tests/integration/jms/client/ReSendLargeMessageTest.java (from rev 8344, branches/20-optimisation/tests/src/org/hornetq/tests/integration/jms/client/ResendTest.java)
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/jms/client/ReSendLargeMessageTest.java (rev 0)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/jms/client/ReSendLargeMessageTest.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -0,0 +1,311 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.jms.client;
+
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRY_INTERVAL;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONFIRMATION_WINDOW_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_WINDOW_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_THREAD_POOL_MAX_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_USE_GLOBAL_POOLS;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.jms.client.HornetQMessage;
+import org.hornetq.tests.util.JMSTestBase;
+import org.hornetq.utils.Pair;
+
+/**
+ * Receive Messages and resend them, like the bridge would do
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReSendLargeMessageTest extends JMSTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private Queue queue;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testResendMessage() throws Exception
+ {
+ Connection conn = cf.createConnection();
+ try
+ {
+ conn.start();
+
+ Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
+ ArrayList<Message> msgs = new ArrayList<Message>();
+
+ for (int i = 0; i < 10; i++)
+ {
+ BytesMessage bm = sess.createBytesMessage();
+ bm.setObjectProperty(HornetQMessage.JMS_HORNETQ_INPUT_STREAM,
+ createFakeLargeStream(2 * ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE));
+ msgs.add(bm);
+
+ MapMessage mm = sess.createMapMessage();
+ mm.setBoolean("boolean", true);
+ mm.setByte("byte", (byte)3);
+ mm.setBytes("bytes", new byte[] { (byte)3, (byte)4, (byte)5 });
+ mm.setChar("char", (char)6);
+ mm.setDouble("double", 7.0);
+ mm.setFloat("float", 8.0f);
+ mm.setInt("int", 9);
+ mm.setLong("long", 10l);
+ mm.setObject("object", new String("this is an object"));
+ mm.setShort("short", (short)11);
+ mm.setString("string", "this is a string");
+
+ msgs.add(mm);
+ msgs.add(sess.createTextMessage("hello" + i));
+ msgs.add(sess.createObjectMessage(new SomeSerializable("hello" + i)));
+ }
+
+ internalTestResend(msgs, sess);
+
+ }
+ finally
+ {
+ conn.close();
+ }
+
+ }
+
+ public void internalTestResend(ArrayList<Message> msgs, Session sess) throws Exception
+ {
+ MessageProducer prod = sess.createProducer(queue);
+
+ for (Message msg : msgs)
+ {
+ prod.send(msg);
+ }
+
+ sess.commit();
+
+ MessageConsumer cons = sess.createConsumer(queue);
+
+ for (int i = 0; i < msgs.size(); i++)
+ {
+ Message msg = cons.receive(5000);
+ assertNotNull(msg);
+
+ prod.send(msg);
+ }
+
+ assertNull(cons.receiveNoWait());
+
+ sess.commit();
+
+ for (Message originalMessage : msgs)
+ {
+ Message copiedMessage = cons.receive(5000);
+ assertNotNull(copiedMessage);
+
+ assertEquals(copiedMessage.getClass(), originalMessage.getClass());
+
+ sess.commit();
+
+ if (copiedMessage instanceof BytesMessage)
+ {
+ BytesMessage copiedBytes = (BytesMessage)copiedMessage;
+
+ for (int i = 0; i < copiedBytes.getBodyLength(); i++)
+ {
+ assertEquals(getSamplebyte(i), copiedBytes.readByte());
+ }
+ }
+ else if (copiedMessage instanceof MapMessage)
+ {
+ MapMessage copiedMap = (MapMessage)copiedMessage;
+ MapMessage originalMap = (MapMessage)originalMessage;
+ assertEquals(originalMap.getString("str"), copiedMap.getString("str"));
+ assertEquals(originalMap.getLong("long"), copiedMap.getLong("long"));
+ assertEquals(originalMap.getInt("int"), copiedMap.getInt("int"));
+ assertEquals(originalMap.getObject("object"), copiedMap.getObject("object"));
+ }
+ else if (copiedMessage instanceof ObjectMessage)
+ {
+ assertNotSame(((ObjectMessage)originalMessage).getObject(), ((ObjectMessage)copiedMessage).getObject());
+ assertEquals(((ObjectMessage)originalMessage).getObject(), ((ObjectMessage)copiedMessage).getObject());
+ }
+ else if (copiedMessage instanceof TextMessage)
+ {
+ assertEquals(((TextMessage)originalMessage).getText(), ((TextMessage)copiedMessage).getText());
+ }
+ }
+
+ }
+
+ public static class SomeSerializable implements Serializable
+ {
+ /**
+ *
+ */
+ private static final long serialVersionUID = -8576054940441747312L;
+
+ final String txt;
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#hashCode()
+ */
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (txt == null ? 0 : txt.hashCode());
+ return result;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ @Override
+ public boolean equals(final Object obj)
+ {
+ if (this == obj)
+ {
+ return true;
+ }
+ if (obj == null)
+ {
+ return false;
+ }
+ if (getClass() != obj.getClass())
+ {
+ return false;
+ }
+ SomeSerializable other = (SomeSerializable)obj;
+ if (txt == null)
+ {
+ if (other.txt != null)
+ {
+ return false;
+ }
+ }
+ else if (!txt.equals(other.txt))
+ {
+ return false;
+ }
+ return true;
+ }
+
+ SomeSerializable(final String txt)
+ {
+ this.txt = txt;
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+ protected void createCF(List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs,
+ List<String> jndiBindings) throws Exception
+ {
+ int retryInterval = 1000;
+ double retryIntervalMultiplier = 1.0;
+ int reconnectAttempts = -1;
+ boolean failoverOnServerShutdown = true;
+ int callTimeout = 30000;
+
+ jmsServer.createConnectionFactory("ManualReconnectionToSingleServerTest",
+ connectorConfigs,
+ null,
+ DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ DEFAULT_CONNECTION_TTL,
+ callTimeout,
+ true,
+ DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ DEFAULT_CONSUMER_WINDOW_SIZE,
+ DEFAULT_CONSUMER_MAX_RATE,
+ DEFAULT_CONFIRMATION_WINDOW_SIZE,
+ DEFAULT_PRODUCER_WINDOW_SIZE,
+ DEFAULT_PRODUCER_MAX_RATE,
+ DEFAULT_BLOCK_ON_ACKNOWLEDGE,
+ DEFAULT_BLOCK_ON_PERSISTENT_SEND,
+ DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
+ DEFAULT_AUTO_GROUP,
+ DEFAULT_PRE_ACKNOWLEDGE,
+ DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
+ DEFAULT_ACK_BATCH_SIZE,
+ DEFAULT_ACK_BATCH_SIZE,
+ DEFAULT_USE_GLOBAL_POOLS,
+ DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
+ DEFAULT_THREAD_POOL_MAX_SIZE,
+ retryInterval,
+ retryIntervalMultiplier,
+ DEFAULT_MAX_RETRY_INTERVAL,
+ reconnectAttempts,
+ failoverOnServerShutdown,
+ jndiBindings);
+ }
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ queue = createQueue("queue1");
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ queue = null;
+ super.tearDown();
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Deleted: branches/20-optimisation/tests/src/org/hornetq/tests/integration/jms/client/ResendTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/jms/client/ResendTest.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/jms/client/ResendTest.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -1,312 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.integration.jms.client;
-
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRY_INTERVAL;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONFIRMATION_WINDOW_SIZE;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_WINDOW_SIZE;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_THREAD_POOL_MAX_SIZE;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_USE_GLOBAL_POOLS;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
-import org.hornetq.core.config.TransportConfiguration;
-import org.hornetq.jms.client.HornetQMessage;
-import org.hornetq.tests.util.JMSTestBase;
-import org.hornetq.utils.Pair;
-
-/**
- * Receive Messages and resend them, like the bridge would do
- *
- * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class ResendTest extends JMSTestBase
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private Queue queue;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public void testResendMessage() throws Exception
- {
- Connection conn = cf.createConnection();
- try
- {
- conn.start();
-
- Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
- ArrayList<Message> msgs = new ArrayList<Message>();
-
- for (int i = 0; i < 10; i++)
- {
- BytesMessage bm = sess.createBytesMessage();
- bm.setObjectProperty(HornetQMessage.JMS_HORNETQ_INPUT_STREAM,
- createFakeLargeStream(2 * ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE));
- msgs.add(bm);
-
- MapMessage mm = sess.createMapMessage();
- mm.setBoolean("boolean", true);
- mm.setByte("byte", (byte)3);
- mm.setBytes("bytes", new byte[] { (byte)3, (byte)4, (byte)5 });
- mm.setChar("char", (char)6);
- mm.setDouble("double", 7.0);
- mm.setFloat("float", 8.0f);
- mm.setInt("int", 9);
- mm.setLong("long", 10l);
- mm.setObject("object", new String("this is an object"));
- mm.setShort("short", (short)11);
- mm.setString("string", "this is a string");
-
- msgs.add(mm);
- msgs.add(sess.createTextMessage("hello" + i));
- msgs.add(sess.createObjectMessage(new SomeSerializable("hello" + i)));
- }
-
- internalTestResend(msgs, sess);
-
- }
- finally
- {
- conn.close();
- }
-
- }
-
- public void internalTestResend(ArrayList<Message> msgs, Session sess) throws Exception
- {
-
- MessageProducer prod = sess.createProducer(queue);
-
- for (Message msg : msgs)
- {
- prod.send(msg);
- }
-
- sess.commit();
-
- MessageConsumer cons = sess.createConsumer(queue);
-
- for (int i = 0; i < msgs.size(); i++)
- {
- Message msg = cons.receive(5000);
- assertNotNull(msg);
-
- prod.send(msg);
- }
-
- assertNull(cons.receiveNoWait());
-
- sess.commit();
-
- for (Message originalMessage : msgs)
- {
- Message copiedMessage = cons.receive(5000);
- assertNotNull(copiedMessage);
-
- assertEquals(copiedMessage.getClass(), originalMessage.getClass());
-
- sess.commit();
-
- if (copiedMessage instanceof BytesMessage)
- {
- BytesMessage copiedBytes = (BytesMessage)copiedMessage;
-
- for (int i = 0; i < copiedBytes.getBodyLength(); i++)
- {
- assertEquals(getSamplebyte(i), copiedBytes.readByte());
- }
- }
- else if (copiedMessage instanceof MapMessage)
- {
- MapMessage copiedMap = (MapMessage)copiedMessage;
- MapMessage originalMap = (MapMessage)originalMessage;
- assertEquals(originalMap.getString("str"), copiedMap.getString("str"));
- assertEquals(originalMap.getLong("long"), copiedMap.getLong("long"));
- assertEquals(originalMap.getInt("int"), copiedMap.getInt("int"));
- assertEquals(originalMap.getObject("object"), copiedMap.getObject("object"));
- }
- else if (copiedMessage instanceof ObjectMessage)
- {
- assertNotSame(((ObjectMessage)originalMessage).getObject(), ((ObjectMessage)copiedMessage).getObject());
- assertEquals(((ObjectMessage)originalMessage).getObject(), ((ObjectMessage)copiedMessage).getObject());
- }
- else if (copiedMessage instanceof TextMessage)
- {
- assertEquals(((TextMessage)originalMessage).getText(), ((TextMessage)copiedMessage).getText());
- }
- }
-
- }
-
- public static class SomeSerializable implements Serializable
- {
- /**
- *
- */
- private static final long serialVersionUID = -8576054940441747312L;
-
- final String txt;
-
- /* (non-Javadoc)
- * @see java.lang.Object#hashCode()
- */
- @Override
- public int hashCode()
- {
- final int prime = 31;
- int result = 1;
- result = prime * result + (txt == null ? 0 : txt.hashCode());
- return result;
- }
-
- /* (non-Javadoc)
- * @see java.lang.Object#equals(java.lang.Object)
- */
- @Override
- public boolean equals(final Object obj)
- {
- if (this == obj)
- {
- return true;
- }
- if (obj == null)
- {
- return false;
- }
- if (getClass() != obj.getClass())
- {
- return false;
- }
- SomeSerializable other = (SomeSerializable)obj;
- if (txt == null)
- {
- if (other.txt != null)
- {
- return false;
- }
- }
- else if (!txt.equals(other.txt))
- {
- return false;
- }
- return true;
- }
-
- SomeSerializable(final String txt)
- {
- this.txt = txt;
- }
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
- protected void createCF(List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs,
- List<String> jndiBindings) throws Exception
- {
- int retryInterval = 1000;
- double retryIntervalMultiplier = 1.0;
- int reconnectAttempts = -1;
- boolean failoverOnServerShutdown = true;
- int callTimeout = 30000;
-
- jmsServer.createConnectionFactory("ManualReconnectionToSingleServerTest",
- connectorConfigs,
- null,
- DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
- DEFAULT_CONNECTION_TTL,
- callTimeout,
- true,
- DEFAULT_MIN_LARGE_MESSAGE_SIZE,
- DEFAULT_CONSUMER_WINDOW_SIZE,
- DEFAULT_CONSUMER_MAX_RATE,
- DEFAULT_CONFIRMATION_WINDOW_SIZE,
- DEFAULT_PRODUCER_WINDOW_SIZE,
- DEFAULT_PRODUCER_MAX_RATE,
- DEFAULT_BLOCK_ON_ACKNOWLEDGE,
- DEFAULT_BLOCK_ON_PERSISTENT_SEND,
- DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
- DEFAULT_AUTO_GROUP,
- DEFAULT_PRE_ACKNOWLEDGE,
- DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
- DEFAULT_ACK_BATCH_SIZE,
- DEFAULT_ACK_BATCH_SIZE,
- DEFAULT_USE_GLOBAL_POOLS,
- DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
- DEFAULT_THREAD_POOL_MAX_SIZE,
- retryInterval,
- retryIntervalMultiplier,
- DEFAULT_MAX_RETRY_INTERVAL,
- reconnectAttempts,
- failoverOnServerShutdown,
- jndiBindings);
- }
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
- queue = createQueue("queue1");
- }
-
- @Override
- protected void tearDown() throws Exception
- {
- queue = null;
- super.tearDown();
- }
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -25,7 +25,7 @@
import javax.transaction.xa.Xid;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.client.ClientConsumer;
import org.hornetq.core.client.ClientMessage;
import org.hornetq.core.client.ClientProducer;
@@ -590,7 +590,7 @@
protected HornetQBuffer createLargeBuffer(final int numberOfIntegers)
{
- HornetQBuffer body = HornetQChannelBuffers.fixedBuffer(DataConstants.SIZE_INT * numberOfIntegers);
+ HornetQBuffer body = HornetQBuffers.fixedBuffer(DataConstants.SIZE_INT * numberOfIntegers);
for (int i = 0; i < numberOfIntegers; i++)
{
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/integration/management/ManagementHelperTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/management/ManagementHelperTest.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/management/ManagementHelperTest.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -24,7 +24,7 @@
import junit.framework.TestCase;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.client.ClientMessage;
import org.hornetq.core.client.impl.ClientMessageImpl;
import org.hornetq.core.client.impl.ClientSessionImpl;
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/integration/management/ManagementServiceImplTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/management/ManagementServiceImplTest.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/management/ManagementServiceImplTest.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -17,7 +17,7 @@
import static org.hornetq.tests.util.RandomUtil.randomString;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.client.management.impl.ManagementHelper;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -28,7 +28,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.client.ClientSessionFactory;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.client.impl.FailoverManager;
@@ -326,7 +326,7 @@
serverMsg.setMessageID(500);
serverMsg.setDestination(new SimpleString("tttt"));
- HornetQBuffer buffer = HornetQChannelBuffers.dynamicBuffer(100);
+ HornetQBuffer buffer = HornetQBuffers.dynamicBuffer(100);
serverMsg.encodeHeadersAndProperties(buffer);
manager.largeMessageBegin(500);
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/performance/persistence/StorageManagerTimingTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/performance/persistence/StorageManagerTimingTest.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/performance/persistence/StorageManagerTimingTest.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -19,7 +19,7 @@
import java.util.concurrent.atomic.AtomicLong;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.config.impl.FileConfiguration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/stress/journal/CompactingStressTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/stress/journal/CompactingStressTest.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/stress/journal/CompactingStressTest.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -17,7 +17,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.client.ClientConsumer;
import org.hornetq.core.client.ClientMessage;
import org.hornetq.core.client.ClientProducer;
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/stress/journal/LargeJournalStressTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/stress/journal/LargeJournalStressTest.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/stress/journal/LargeJournalStressTest.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -16,7 +16,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.client.ClientConsumer;
import org.hornetq.core.client.ClientMessage;
import org.hornetq.core.client.ClientProducer;
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/stress/journal/MultiThreadConsumerStressTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/stress/journal/MultiThreadConsumerStressTest.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/stress/journal/MultiThreadConsumerStressTest.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -16,7 +16,7 @@
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.client.*;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/stress/journal/NIOMultiThreadCompactorStressTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/stress/journal/NIOMultiThreadCompactorStressTest.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/stress/journal/NIOMultiThreadCompactorStressTest.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -22,7 +22,7 @@
import javax.transaction.xa.Xid;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.client.ClientConsumer;
import org.hornetq.core.client.ClientMessage;
import org.hornetq.core.client.ClientProducer;
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/timing/util/UTF8Test.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/timing/util/UTF8Test.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/timing/util/UTF8Test.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -14,7 +14,7 @@
package org.hornetq.tests.timing.util;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.tests.util.UnitTestCase;
import org.hornetq.utils.UTF8Util;
@@ -47,7 +47,7 @@
public void testWriteUTF() throws Exception
{
- HornetQBuffer buffer = HornetQChannelBuffers.fixedBuffer(10 * 1024);
+ HornetQBuffer buffer = HornetQBuffers.fixedBuffer(10 * 1024);
long start = System.currentTimeMillis();
@@ -72,7 +72,7 @@
public void testReadUTF() throws Exception
{
- HornetQBuffer buffer = HornetQChannelBuffers.fixedBuffer(10 * 1024);
+ HornetQBuffer buffer = HornetQBuffers.fixedBuffer(10 * 1024);
buffer.writeUTF(str);
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -26,7 +26,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.client.ClientMessage;
import org.hornetq.core.client.MessageHandler;
import org.hornetq.core.client.impl.ClientConsumerInternal;
@@ -114,7 +114,7 @@
{
LargeMessageBufferImpl buffer = create15BytesSample();
- HornetQBuffer dstBuffer = HornetQChannelBuffers.fixedBuffer(20);
+ HornetQBuffer dstBuffer = HornetQBuffers.fixedBuffer(20);
dstBuffer.setIndex(0, 5);
@@ -168,7 +168,7 @@
public void testReadData() throws Exception
{
- HornetQBuffer dynamic = HornetQChannelBuffers.dynamicBuffer(1);
+ HornetQBuffer dynamic = HornetQBuffers.dynamicBuffer(1);
String str1 = RandomUtil.randomString();
String str2 = RandomUtil.randomString();
@@ -197,7 +197,7 @@
{
clearData();
- HornetQBuffer dynamic = HornetQChannelBuffers.dynamicBuffer(1);
+ HornetQBuffer dynamic = HornetQBuffers.dynamicBuffer(1);
String str1 = RandomUtil.randomString();
String str2 = RandomUtil.randomString();
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/filter/impl/FilterTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/filter/impl/FilterTest.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/filter/impl/FilterTest.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -13,10 +13,10 @@
package org.hornetq.tests.unit.core.filter.impl;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.filter.impl.FilterImpl;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.tests.util.RandomUtil;
@@ -32,6 +32,8 @@
*/
public class FilterTest extends UnitTestCase
{
+ private static final Logger log = Logger.getLogger(FilterTest.class);
+
private Filter filter;
private ServerMessage message;
@@ -40,7 +42,7 @@
{
super.setUp();
- message = new ServerMessageImpl();
+ message = new ServerMessageImpl(1, 1000);
}
public void testFilterForgets() throws Exception
@@ -103,16 +105,19 @@
{
message.setDestination(RandomUtil.randomSimpleString());
- assertTrue(message.getEncodeSize() < 1024);
-
- Filter moreThan128 = FilterImpl.createFilter(new SimpleString("HQSize > 128"));
- Filter lessThan1024 = FilterImpl.createFilter(new SimpleString("HQSize < 1024"));
+ int encodeSize = message.getEncodeSize();
+
+ Filter moreThanSmall = FilterImpl.createFilter(new SimpleString("HQSize > " + (encodeSize - 1)));
+ Filter lessThanLarge = FilterImpl.createFilter(new SimpleString("HQSize < " + (encodeSize + 1)));
- assertFalse(moreThan128.match(message));
- assertTrue(lessThan1024.match(message));
+ Filter lessThanSmall = FilterImpl.createFilter(new SimpleString("HQSize < " + encodeSize));
+ Filter moreThanLarge = FilterImpl.createFilter(new SimpleString("HQSize > " + encodeSize));
+
+ assertTrue(moreThanSmall.match(message));
+ assertTrue(lessThanLarge.match(message));
- assertTrue(moreThan128.match(message));
- assertFalse(lessThan1024.match(message));
+ assertFalse(lessThanSmall.match(message));
+ assertFalse(moreThanLarge.match(message));
}
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -18,7 +18,7 @@
import java.util.List;
import java.util.concurrent.Executors;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.paging.Page;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.impl.PagedMessageImpl;
@@ -112,11 +112,12 @@
protected ServerMessage createMessage(final long messageId, final SimpleString destination, final ByteBuffer buffer)
{
- ServerMessage msg = new ServerMessageImpl();
+ ServerMessage msg = new ServerMessageImpl(messageId, 1000);
- msg.setMessageID(messageId);
-
msg.setDestination(destination);
+
+ msg.getBodyBuffer().writeBytes(buffer);
+
return msg;
}
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -29,7 +29,7 @@
import javax.transaction.xa.Xid;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
@@ -107,7 +107,7 @@
assertEquals(nr1, trans.getNumberOfMessages());
- HornetQBuffer buffer = HornetQChannelBuffers.fixedBuffer(trans.getEncodeSize());
+ HornetQBuffer buffer = HornetQBuffers.fixedBuffer(trans.getEncodeSize());
trans.encode(buffer);
@@ -205,7 +205,7 @@
buffers.add(buffer);
SimpleString destination = new SimpleString("test");
- ServerMessage msg = createMessage(storeImpl, destination, buffer);
+ ServerMessage msg = createMessage(1, storeImpl, destination, buffer);
assertTrue(storeImpl.isPaging());
@@ -268,7 +268,7 @@
buffers.add(buffer);
- ServerMessage msg = createMessage(storeImpl, destination, buffer);
+ ServerMessage msg = createMessage(i, storeImpl, destination, buffer);
assertTrue(storeImpl.page(msg, true));
}
@@ -342,7 +342,7 @@
storeImpl.forceAnotherPage();
}
- ServerMessage msg = createMessage(storeImpl, destination, buffer);
+ ServerMessage msg = createMessage(i, storeImpl, destination, buffer);
assertTrue(storeImpl.page(msg, true));
}
@@ -374,7 +374,7 @@
assertTrue(storeImpl.isPaging());
- ServerMessage msg = createMessage(storeImpl, destination, buffers.get(0));
+ ServerMessage msg = createMessage(1, storeImpl, destination, buffers.get(0));
assertTrue(storeImpl.page(msg, true));
@@ -493,7 +493,7 @@
// Each thread will Keep paging until all the messages are depaged.
// This is possible because the depage thread is not actually reading the pages.
// Just using the internal API to remove it from the page file system
- ServerMessage msg = createMessage(storeImpl, destination, createRandomBuffer(id, 5));
+ ServerMessage msg = createMessage(id, storeImpl, destination, createRandomBuffer(id, 5));
if (storeImpl.page(msg, false))
{
buffers.put(id, msg);
@@ -637,7 +637,7 @@
assertEquals(numberOfPages, storeImpl2.getNumberOfPages());
long lastMessageId = messageIdGenerator.incrementAndGet();
- ServerMessage lastMsg = createMessage(storeImpl, destination, createRandomBuffer(lastMessageId, 5));
+ ServerMessage lastMsg = createMessage(lastMessageId, storeImpl, destination, createRandomBuffer(lastMessageId, 5));
storeImpl2.page(lastMsg, true);
buffers2.put(lastMessageId, lastMsg);
@@ -702,22 +702,24 @@
return new FakePostOffice();
}
- private ServerMessage createMessage(final PagingStore store,
+ private ServerMessage createMessage(final long id, final PagingStore store,
final SimpleString destination,
final HornetQBuffer buffer)
{
- ServerMessage msg = new ServerMessageImpl();
+ ServerMessage msg = new ServerMessageImpl(id, 1000);
msg.setDestination(destination);
msg.setPagingStore(store);
+
+ msg.getBodyBuffer().writeBytes(buffer, buffer.capacity());
return msg;
}
private HornetQBuffer createRandomBuffer(final long id, final int size)
{
- HornetQBuffer buffer = HornetQChannelBuffers.fixedBuffer(size + 8);
+ HornetQBuffer buffer = HornetQBuffers.fixedBuffer(size + 8);
buffer.writeLong(id);
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -17,7 +17,7 @@
import java.util.ArrayList;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.PreparedTransactionInfo;
import org.hornetq.core.journal.RecordInfo;
@@ -145,7 +145,7 @@
{
if (record.userRecordType == JournalStorageManager.ID_COUNTER_RECORD)
{
- HornetQBuffer buffer = HornetQChannelBuffers.wrappedBuffer(record.data);
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(record.data);
batch.loadState(record.id, buffer);
}
}
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/ChannelBufferWrapper2Test.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/ChannelBufferWrapper2Test.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/ChannelBufferWrapper2Test.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -14,7 +14,7 @@
package org.hornetq.tests.unit.core.remoting.impl.netty;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.tests.unit.core.remoting.HornetQBufferTestBase;
/**
@@ -43,7 +43,7 @@
@Override
protected HornetQBuffer createBuffer()
{
- return HornetQChannelBuffers.dynamicBuffer(512);
+ return HornetQBuffers.dynamicBuffer(512);
}
// Package protected ---------------------------------------------
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -20,7 +20,7 @@
import java.util.List;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.remoting.spi.Connection;
import org.hornetq.core.remoting.spi.ConnectionLifeCycleListener;
@@ -52,7 +52,7 @@
public void testWrite() throws Exception
{
- HornetQBuffer buff = HornetQChannelBuffers.wrappedBuffer(ByteBuffer.allocate(128));
+ HornetQBuffer buff = HornetQBuffers.wrappedBuffer(ByteBuffer.allocate(128));
SimpleChannel channel = new SimpleChannel(randomInt());
assertEquals(0, channel.getWritten().size());
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/unit/util/TypedPropertiesTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/unit/util/TypedPropertiesTest.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/unit/util/TypedPropertiesTest.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -27,7 +27,7 @@
import java.util.Iterator;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.tests.util.UnitTestCase;
import org.hornetq.utils.SimpleString;
import org.hornetq.utils.TypedProperties;
@@ -210,7 +210,7 @@
SimpleString keyToRemove = randomSimpleString();
props.putSimpleStringProperty(keyToRemove, randomSimpleString());
- HornetQBuffer buffer = HornetQChannelBuffers.dynamicBuffer(1024);
+ HornetQBuffer buffer = HornetQBuffers.dynamicBuffer(1024);
props.encode(buffer);
assertEquals(props.getEncodeSize(), buffer.writerIndex());
@@ -234,7 +234,7 @@
{
TypedProperties emptyProps = new TypedProperties();
- HornetQBuffer buffer = HornetQChannelBuffers.dynamicBuffer(1024);
+ HornetQBuffer buffer = HornetQBuffers.dynamicBuffer(1024);
emptyProps.encode(buffer);
assertEquals(props.getEncodeSize(), buffer.writerIndex());
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/unit/util/UTF8Test.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/unit/util/UTF8Test.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/unit/util/UTF8Test.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -20,7 +20,7 @@
import java.nio.ByteBuffer;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.UnitTestCase;
import org.hornetq.utils.DataConstants;
@@ -41,7 +41,7 @@
public void testValidateUTF() throws Exception
{
- HornetQBuffer buffer = HornetQChannelBuffers.fixedBuffer(60 * 1024);
+ HornetQBuffer buffer = HornetQBuffers.fixedBuffer(60 * 1024);
byte[] bytes = new byte[20000];
@@ -71,11 +71,11 @@
String str = new String(bytes);
// The maximum size the encoded UTF string would reach is str.length * 3 (look at the UTF8 implementation)
- testValidateUTFOnDataInputStream(str, HornetQChannelBuffers.wrappedBuffer(ByteBuffer.allocate(str.length() * 3 + DataConstants.SIZE_SHORT)));
+ testValidateUTFOnDataInputStream(str, HornetQBuffers.wrappedBuffer(ByteBuffer.allocate(str.length() * 3 + DataConstants.SIZE_SHORT)));
- testValidateUTFOnDataInputStream(str, HornetQChannelBuffers.dynamicBuffer(100));
+ testValidateUTFOnDataInputStream(str, HornetQBuffers.dynamicBuffer(100));
- testValidateUTFOnDataInputStream(str, HornetQChannelBuffers.fixedBuffer(100 * 1024));
+ testValidateUTFOnDataInputStream(str, HornetQBuffers.fixedBuffer(100 * 1024));
}
}
@@ -94,7 +94,7 @@
outData.writeUTF(str);
- HornetQBuffer buffer = HornetQChannelBuffers.wrappedBuffer(byteOut.toByteArray());
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(byteOut.toByteArray());
newStr = UTF8Util.readUTF(buffer);
@@ -113,7 +113,7 @@
String str = new String(chars);
- HornetQBuffer buffer = HornetQChannelBuffers.fixedBuffer(0xffff + 4);
+ HornetQBuffer buffer = HornetQBuffers.fixedBuffer(0xffff + 4);
try
{
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/util/UnitTestCase.java 2009-11-25 03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/util/UnitTestCase.java 2009-11-25 11:07:36 UTC (rev 8401)
@@ -44,7 +44,7 @@
import junit.framework.TestSuite;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.client.ClientMessage;
import org.hornetq.core.client.ClientSession;
import org.hornetq.core.exception.HornetQException;
15 years, 1 month
JBoss hornetq SVN: r8400 - branches.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-24 22:26:45 -0500 (Tue, 24 Nov 2009)
New Revision: 8400
Removed:
branches/ClebertCallback/
Log:
Deleting temporary branch
15 years, 1 month
JBoss hornetq SVN: r8399 - in trunk: src/main/org/hornetq/core/persistence/impl/journal and 3 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-24 20:40:17 -0500 (Tue, 24 Nov 2009)
New Revision: 8399
Modified:
trunk/src/main/org/hornetq/core/filter/impl/FilterImpl.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
trunk/src/main/org/hornetq/core/postoffice/impl/DivertBinding.java
trunk/src/main/org/hornetq/core/postoffice/impl/LocalQueueBinding.java
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/tests/src/org/hornetq/tests/integration/divert/PersistentDivertTest.java
Log:
Fixing divert test
Modified: trunk/src/main/org/hornetq/core/filter/impl/FilterImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/filter/impl/FilterImpl.java 2009-11-25 00:56:16 UTC (rev 8398)
+++ trunk/src/main/org/hornetq/core/filter/impl/FilterImpl.java 2009-11-25 01:40:17 UTC (rev 8399)
@@ -175,7 +175,17 @@
return false;
}
}
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString()
+ {
+ return "FilterImpl [sfilterString=" + sfilterString + "]";
+ }
+
// Private --------------------------------------------------------------------------
private Object getHeaderFieldValue(final ServerMessage msg, final SimpleString fieldName)
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2009-11-25 00:56:16 UTC (rev 8398)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2009-11-25 01:40:17 UTC (rev 8399)
@@ -204,7 +204,8 @@
else
{
// The actions need to be done in order...
- // so it must achieve both conditions before we can proceed to more tasks
+ // And they are added in order...
+ // As soon as we're done, we break the loop
break;
}
}
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/DivertBinding.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/DivertBinding.java 2009-11-25 00:56:16 UTC (rev 8398)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/DivertBinding.java 2009-11-25 01:40:17 UTC (rev 8399)
@@ -125,5 +125,14 @@
return BindingType.DIVERT;
}
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString()
+ {
+ return "DivertBinding [divert=" + divert + "]";
+ }
+
}
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/LocalQueueBinding.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/LocalQueueBinding.java 2009-11-25 00:56:16 UTC (rev 8398)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/LocalQueueBinding.java 2009-11-25 01:40:17 UTC (rev 8399)
@@ -140,5 +140,16 @@
{
return BindingType.LOCAL_QUEUE;
}
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString()
+ {
+ return "LocalQueueBinding [address=" + address + ", name=" + name + ", filter=" + filter + "]";
+ }
+
+
}
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-25 00:56:16 UTC (rev 8398)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-25 01:40:17 UTC (rev 8399)
@@ -550,6 +550,11 @@
throw new IllegalStateException("Message cannot be routed more than once");
}
+ if (message.getMessageID() == 0l)
+ {
+ generateID(message);
+ }
+
RoutingContext context = new RoutingContextImpl(tx);
SimpleString address = message.getDestination();
@@ -839,6 +844,15 @@
// Private -----------------------------------------------------------------
+ private void generateID(final ServerMessage message)
+ {
+ // Setting the ID for the routed message
+ long id = storageManager.generateUniqueID();
+
+ message.setMessageID(id);
+ }
+
+
private void setPagingStore(final ServerMessage message) throws Exception
{
PagingStore store = pagingManager.getPageStore(message.getDestination());
Modified: trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java 2009-11-25 00:56:16 UTC (rev 8398)
+++ trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java 2009-11-25 01:40:17 UTC (rev 8399)
@@ -84,6 +84,9 @@
// TODO we can optimise this so it doesn't copy if it's not routed anywhere else
ServerMessage copy = message.copy();
+
+ // Setting the messageID to 0. The postOffice should set a new one
+ copy.setMessageID(0);
copy.setDestination(forwardAddress);
@@ -116,4 +119,26 @@
{
return filter;
}
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString()
+ {
+ return "DivertImpl [routingName=" + routingName +
+ ", uniqueName=" +
+ uniqueName +
+ ", forwardAddress=" +
+ forwardAddress +
+ ", exclusive=" +
+ exclusive +
+ ", filter=" +
+ filter +
+ ", transformer=" +
+ transformer +
+ "]";
+ }
+
+
}
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-11-25 00:56:16 UTC (rev 8398)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-11-25 01:40:17 UTC (rev 8399)
@@ -650,8 +650,6 @@
Channel channel = connection.getChannel(channelID, sendWindowSize);
Executor sessionExecutor = executorFactory.getExecutor();
-
- storageManager.newContext(sessionExecutor);
final ServerSessionImpl session = new ServerSessionImpl(name,
username,
Modified: trunk/tests/src/org/hornetq/tests/integration/divert/PersistentDivertTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/divert/PersistentDivertTest.java 2009-11-25 00:56:16 UTC (rev 8398)
+++ trunk/tests/src/org/hornetq/tests/integration/divert/PersistentDivertTest.java 2009-11-25 01:40:17 UTC (rev 8399)
@@ -44,7 +44,7 @@
public class PersistentDivertTest extends ServiceTestBase
{
private static final Logger log = Logger.getLogger(DivertTest.class);
-
+
public void testPersistentDivert() throws Exception
{
Configuration conf = createDefaultConfig();
@@ -79,7 +79,11 @@
ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
- ClientSession session = sf.createSession(false, true, true);
+ sf.setBlockOnAcknowledge(true);
+ sf.setBlockOnNonPersistentSend(true);
+ sf.setBlockOnPersistentSend(true);
+
+ ClientSession session = sf.createSession(true, true, 0);
final SimpleString queueName1 = new SimpleString("queue1");
@@ -173,7 +177,6 @@
}
assertNull(consumer4.receiveImmediate());
-
session.close();
sf.close();
@@ -214,10 +217,12 @@
messagingService.start();
ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
-
+
+ sf.setBlockOnAcknowledge(true);
+ sf.setBlockOnNonPersistentSend(true);
sf.setBlockOnPersistentSend(true);
- ClientSession session = sf.createSession(false, true, true);
+ ClientSession session = sf.createSession(true, true, 0);
final SimpleString queueName1 = new SimpleString("queue1");
15 years, 1 month
JBoss hornetq SVN: r8398 - trunk/src/main/org/hornetq/integration/transports/netty.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-24 19:56:16 -0500 (Tue, 24 Nov 2009)
New Revision: 8398
Modified:
trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
trunk/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java
Log:
Reverting Trustin's commit for now. Those changes were supposed to be done at the optimization's branch.
Modified: trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java 2009-11-25 00:54:37 UTC (rev 8397)
+++ trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java 2009-11-25 00:56:16 UTC (rev 8398)
@@ -47,7 +47,7 @@
public static void addCodecFilter(final ChannelPipeline pipeline, final BufferHandler handler)
{
assert pipeline != null;
- pipeline.addLast("decoder", new HornetQFrameDecoder());
+ pipeline.addLast("decoder", new HornetQFrameDecoder(handler));
}
public static void addSSLFilter(final ChannelPipeline pipeline, final SSLContext context, final boolean client) throws Exception
Modified: trunk/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java 2009-11-25 00:54:37 UTC (rev 8397)
+++ trunk/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java 2009-11-25 00:56:16 UTC (rev 8398)
@@ -15,17 +15,15 @@
import static org.hornetq.utils.DataConstants.SIZE_INT;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.spi.BufferHandler;
import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.buffer.DynamicChannelBuffer;
+import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipelineCoverage;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.handler.codec.frame.FrameDecoder;
/**
- * A Netty decoder used to decode messages.
+ * A Netty FrameDecoder used to decode messages.
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="ataylor(a)redhat.com">Andy Taylor</a>
@@ -33,151 +31,34 @@
*
* @version $Revision$, $Date$
*/
-@ChannelPipelineCoverage("one")
-public class HornetQFrameDecoder extends SimpleChannelUpstreamHandler
+public class HornetQFrameDecoder extends FrameDecoder
{
- private ChannelBuffer previousData = ChannelBuffers.EMPTY_BUFFER;
+ private static final Logger log = Logger.getLogger(HornetQFrameDecoder.class);
- // SimpleChannelUpstreamHandler overrides
+ private final BufferHandler handler;
+
+ public HornetQFrameDecoder(final BufferHandler handler)
+ {
+ this.handler = handler;
+ }
+
+ // FrameDecoder overrides
// -------------------------------------------------------------------------------------
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
+ protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer in) throws Exception
{
- ChannelBuffer in = (ChannelBuffer) e.getMessage();
- if (previousData.readable())
- {
- if (previousData.readableBytes() + in.readableBytes() < SIZE_INT) {
- // XXX Length is unknown. Bet at 512. Tune this value.
- append(in, 512);
- return;
- }
-
- // Decode the first message. The first message requires a special
- // treatment because it is the only message that spans over the two
- // buffers.
- final int length;
- switch (previousData.readableBytes()) {
- case 1:
- length = (previousData.getUnsignedByte(previousData.readerIndex()) << 24) |
- in.getMedium(in.readerIndex());
- if (in.readableBytes() - 3 < length) {
- append(in, length);
- return;
- }
- break;
- case 2:
- length = (previousData.getUnsignedShort(previousData.readerIndex()) << 16) |
- in.getUnsignedShort(in.readerIndex());
- if (in.readableBytes() - 2 < length) {
- append(in, length);
- return;
- }
- break;
- case 3:
- length = (previousData.getUnsignedMedium(previousData.readerIndex()) << 8) |
- in.getUnsignedByte(in.readerIndex());
- if (in.readableBytes() - 1 < length) {
- append(in, length);
- return;
- }
- break;
- case 4:
- length = previousData.getInt(previousData.readerIndex());
- if (in.readableBytes() - 4 < length) {
- append(in, length);
- return;
- }
- break;
- default:
- length = previousData.getInt(previousData.readerIndex());
- if (in.readableBytes() + previousData.readableBytes() - 4 < length) {
- append(in, length);
- return;
- }
- }
-
- final ChannelBuffer frame;
- if (previousData instanceof DynamicChannelBuffer) {
- // It's safe to reuse the current dynamic buffer
- // because previousData will be reassigned to
- // EMPTY_BUFFER or 'in' later.
- previousData.writeBytes(in, length + 4 - previousData.readableBytes());
- frame = previousData;
- } else {
- // XXX Tune this value: Increasing the initial capacity of the
- // dynamic buffer might reduce the chance of additional memory
- // copy.
- frame = ChannelBuffers.dynamicBuffer(length + 4);
- frame.writeBytes(previousData, previousData.readerIndex(), previousData.readableBytes());
- frame.writeBytes(in, length + 4 - frame.writerIndex());
- }
+ // TODO - we can avoid this entirely if we maintain fragmented packets in the handler
+ int start = in.readerIndex();
- frame.skipBytes(4);
- Channels.fireMessageReceived(ctx, frame);
-
- if (!in.readable()) {
- previousData = ChannelBuffers.EMPTY_BUFFER;
- return;
- }
- }
-
- // And then handle the rest - we don't need to deal with the
- // composite buffer anymore because the second or later messages
- // always belong to the second buffer.
- decode(ctx, in);
-
- // Handle the leftover.
- if (in.readable())
+ int length = handler.isReadyToHandle(new ChannelBufferWrapper(in));
+ if (length == -1)
{
- previousData = in;
+ in.readerIndex(start);
+ return null;
}
- else
- {
- previousData = ChannelBuffers.EMPTY_BUFFER;
- }
+
+ in.readerIndex(start + SIZE_INT);
+ return in.readBytes(length);
}
-
- private void decode(ChannelHandlerContext ctx, ChannelBuffer in)
- {
- for (;;) {
- final int readableBytes = in.readableBytes();
- if (readableBytes < SIZE_INT) {
- break;
- }
-
- final int length = in.getInt(in.readerIndex());
- if (readableBytes < length + SIZE_INT) {
- break;
- }
-
- // Convert to dynamic buffer (this requires copy)
- // XXX Tune this value: Increasing the initial capacity of the dynamic
- // buffer might reduce the chance of additional memory copy.
- ChannelBuffer frame = ChannelBuffers.dynamicBuffer(length + SIZE_INT);
- frame.writeBytes(in, length + SIZE_INT);
- frame.skipBytes(SIZE_INT);
- Channels.fireMessageReceived(ctx, frame);
- }
- }
-
- private void append(ChannelBuffer in, int length)
- {
- // Need more data to decode the first message. This can happen when
- // a client is very slow. (e.g.sending each byte one by one)
- if (previousData instanceof DynamicChannelBuffer)
- {
- previousData.discardReadBytes();
- previousData.writeBytes(in);
- }
- else
- {
- ChannelBuffer newPreviousData =
- ChannelBuffers.dynamicBuffer(
- Math.max(previousData.readableBytes() + in.readableBytes(), length + 4));
- newPreviousData.writeBytes(previousData);
- newPreviousData.writeBytes(in);
- previousData = newPreviousData;
- }
- }
}
15 years, 1 month
JBoss hornetq SVN: r8397 - branches/20-optimisation/src/main/org/hornetq/integration/transports/netty.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-11-24 19:54:37 -0500 (Tue, 24 Nov 2009)
New Revision: 8397
Modified:
branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
Log:
optimisation
Modified: branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java 2009-11-25 00:45:52 UTC (rev 8396)
+++ branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java 2009-11-25 00:54:37 UTC (rev 8397)
@@ -47,7 +47,7 @@
public static void addCodecFilter(final ChannelPipeline pipeline, final BufferHandler handler)
{
assert pipeline != null;
- pipeline.addLast("decoder", new HornetQFrameDecoder2());
+ pipeline.addLast("decoder", new HornetQFrameDecoder(handler));
}
public static void addSSLFilter(final ChannelPipeline pipeline, final SSLContext context, final boolean client) throws Exception
15 years, 1 month
JBoss hornetq SVN: r8396 - in branches/20-optimisation: examples/jms/interceptor/src/org/hornetq/jms/example and 18 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-11-24 19:45:52 -0500 (Tue, 24 Nov 2009)
New Revision: 8396
Modified:
branches/20-optimisation/.classpath
branches/20-optimisation/build-hornetq.xml
branches/20-optimisation/examples/jms/interceptor/src/org/hornetq/jms/example/SimpleInterceptor.java
branches/20-optimisation/src/main/org/hornetq/core/buffers/HornetQBuffer.java
branches/20-optimisation/src/main/org/hornetq/core/buffers/impl/ChannelBufferWrapper.java
branches/20-optimisation/src/main/org/hornetq/core/buffers/impl/ResetLimitWrappedHornetQBuffer.java
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionPacketHandler.java
branches/20-optimisation/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java
branches/20-optimisation/src/main/org/hornetq/core/journal/impl/JournalImpl.java
branches/20-optimisation/src/main/org/hornetq/core/message/Message.java
branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java
branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PageImpl.java
branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/MessagePacket.java
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java
branches/20-optimisation/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/20-optimisation/src/main/org/hornetq/utils/UTF8Util.java
branches/20-optimisation/tests/jms-tests/src/org/hornetq/jms/tests/BrowserTest.java
branches/20-optimisation/tests/jms-tests/src/org/hornetq/jms/tests/TransactedSessionTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/AckBatchSizeTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/DeadLetterAddressTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/InVMNonPersistentMessageBufferTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/performance/persistence/StorageManagerTimingTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
Log:
optimisation
Modified: branches/20-optimisation/.classpath
===================================================================
--- branches/20-optimisation/.classpath 2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/.classpath 2009-11-25 00:45:52 UTC (rev 8396)
@@ -100,7 +100,7 @@
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="lib" path="tests/tmpfiles"/>
<classpathentry kind="lib" path="thirdparty/net/java/dev/javacc/lib/javacc.jar"/>
- <classpathentry kind="lib" path="thirdparty/org/jboss/netty/lib/netty.jar"/>
+ <classpathentry kind="lib" path="thirdparty/org/jboss/netty/lib/netty.jar" sourcepath="/home/tim/workspace/netty-3.1.5.GA/src/main"/>
<classpathentry kind="lib" path="thirdparty/log4j/lib/log4j.jar"/>
<classpathentry kind="lib" path="thirdparty/org/jboss/naming/lib/jnpserver.jar"/>
<classpathentry kind="lib" path="thirdparty/org/jboss/security/lib/jbosssx.jar"/>
Modified: branches/20-optimisation/build-hornetq.xml
===================================================================
--- branches/20-optimisation/build-hornetq.xml 2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/build-hornetq.xml 2009-11-25 00:45:52 UTC (rev 8396)
@@ -172,6 +172,10 @@
<!-- Classpath definition -->
<!-- ======================================================================================== -->
+ <path id="core.compilation.classpath">
+ <path refid="org.jboss.netty.classpath"/>
+ </path>
+
<path id="jms.compilation.classpath">
<path location="${build.core.classes.dir}"/>
<path refid="org.jboss.javaee.classpath"/>
@@ -401,6 +405,7 @@
</src>
<include name="**/hornetq/core/**/*.java"/>
<include name="**/hornetq/utils/**/*.java"/>
+ <classpath refid="core.compilation.classpath"/>
</javac>
<javah class="org.hornetq.core.asyncio.impl.AsynchronousFileImpl"
classpath="${build.core.classes.dir}" destdir="./native/src"/>
Modified: branches/20-optimisation/examples/jms/interceptor/src/org/hornetq/jms/example/SimpleInterceptor.java
===================================================================
--- branches/20-optimisation/examples/jms/interceptor/src/org/hornetq/jms/example/SimpleInterceptor.java 2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/examples/jms/interceptor/src/org/hornetq/jms/example/SimpleInterceptor.java 2009-11-25 00:45:52 UTC (rev 8396)
@@ -38,7 +38,7 @@
if (packet instanceof SessionSendMessage)
{
SessionSendMessage realPacket = (SessionSendMessage)packet;
- Message msg = realPacket.getServerMessage();
+ Message msg = realPacket.getMessage();
msg.putStringProperty(new SimpleString("newproperty"), new SimpleString("Hello from interceptor!"));
}
//We return true which means "call next interceptor" (if there is one) or target.
Modified: branches/20-optimisation/src/main/org/hornetq/core/buffers/HornetQBuffer.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/buffers/HornetQBuffer.java 2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/core/buffers/HornetQBuffer.java 2009-11-25 00:45:52 UTC (rev 8396)
@@ -207,7 +207,7 @@
String readString();
- String readUTF() throws Exception;
+ String readUTF();
void writeBoolean(boolean val);
@@ -219,5 +219,5 @@
void writeString(String val);
- void writeUTF(String utf) throws Exception;
+ void writeUTF(String utf);
}
Modified: branches/20-optimisation/src/main/org/hornetq/core/buffers/impl/ChannelBufferWrapper.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/buffers/impl/ChannelBufferWrapper.java 2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/core/buffers/impl/ChannelBufferWrapper.java 2009-11-25 00:45:52 UTC (rev 8396)
@@ -37,7 +37,7 @@
protected ChannelBuffer buffer;
public ChannelBufferWrapper(final ChannelBuffer buffer)
- {
+ {
this.buffer = buffer;
}
@@ -48,69 +48,79 @@
public SimpleString readNullableSimpleString()
{
- int b = readByte();
+ int b = buffer.readByte();
if (b == DataConstants.NULL)
{
return null;
}
else
{
- return readSimpleString();
+ return readSimpleStringInternal();
}
}
public String readNullableString()
{
- int b = readByte();
+ int b = buffer.readByte();
if (b == DataConstants.NULL)
{
return null;
}
else
{
- return readString();
+ return readStringInternal();
}
}
public SimpleString readSimpleString()
{
- int len = readInt();
+ return readSimpleStringInternal();
+ }
+
+ private SimpleString readSimpleStringInternal()
+ {
+ int len = buffer.readInt();
byte[] data = new byte[len];
- readBytes(data);
+ buffer.readBytes(data);
return new SimpleString(data);
}
public String readString()
{
- int len = readInt();
+ return readStringInternal();
+ }
+
+ private String readStringInternal()
+ {
+ int len = buffer.readInt();
char[] chars = new char[len];
for (int i = 0; i < len; i++)
{
- chars[i] = (char)readShort();
+ chars[i] = (char)buffer.readShort();
}
return new String(chars);
}
- public String readUTF() throws Exception
+ public String readUTF()
{
return UTF8Util.readUTF(this);
}
public void writeBoolean(final boolean val)
{
- writeByte((byte)(val ? -1 : 0));
+ buffer.writeByte((byte)(val ? -1 : 0));
}
public void writeNullableSimpleString(final SimpleString val)
{
if (val == null)
{
- writeByte(DataConstants.NULL);
+ buffer.writeByte(DataConstants.NULL);
}
else
{
- writeByte(DataConstants.NOT_NULL);
- writeSimpleString(val);
+ buffer.writeByte(DataConstants.NOT_NULL);
+ writeSimpleStringInternal(val);
}
}
@@ -118,32 +128,42 @@
{
if (val == null)
{
- writeByte(DataConstants.NULL);
+ buffer.writeByte(DataConstants.NULL);
}
else
{
- writeByte(DataConstants.NOT_NULL);
- writeString(val);
+ buffer.writeByte(DataConstants.NOT_NULL);
+ writeStringInternal(val);
}
}
public void writeSimpleString(final SimpleString val)
{
+ writeSimpleStringInternal(val);
+ }
+
+ private void writeSimpleStringInternal(final SimpleString val)
+ {
byte[] data = val.getData();
- writeInt(data.length);
- writeBytes(data);
+ buffer.writeInt(data.length);
+ buffer.writeBytes(data);
}
public void writeString(final String val)
{
- writeInt(val.length());
+ writeStringInternal(val);
+ }
+
+ private void writeStringInternal(final String val)
+ {
+ buffer.writeInt(val.length());
for (int i = 0; i < val.length(); i++)
{
- writeShort((short)val.charAt(i));
+ buffer.writeShort((short)val.charAt(i));
}
}
- public void writeUTF(final String utf) throws Exception
+ public void writeUTF(final String utf)
{
UTF8Util.saveUTF(this, utf);
}
@@ -154,7 +174,6 @@
return buffer.capacity();
}
-
public ChannelBuffer channelBuffer()
{
return buffer;
Modified: branches/20-optimisation/src/main/org/hornetq/core/buffers/impl/ResetLimitWrappedHornetQBuffer.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/buffers/impl/ResetLimitWrappedHornetQBuffer.java 2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/core/buffers/impl/ResetLimitWrappedHornetQBuffer.java 2009-11-25 00:45:52 UTC (rev 8396)
@@ -13,8 +13,12 @@
package org.hornetq.core.buffers.impl;
+import java.nio.ByteBuffer;
+
import org.hornetq.core.buffers.HornetQBuffer;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.Message;
+import org.hornetq.utils.SimpleString;
/**
* A ResetLimitWrappedHornetQBuffer
@@ -27,8 +31,11 @@
private static final Logger log = Logger.getLogger(ResetLimitWrappedHornetQBuffer.class);
private final int limit;
-
- public ResetLimitWrappedHornetQBuffer(final int limit, final HornetQBuffer buffer)
+
+ private Message message;
+
+ public ResetLimitWrappedHornetQBuffer(final int limit, final HornetQBuffer buffer,
+ final Message message)
{
super(buffer.channelBuffer());
@@ -40,7 +47,14 @@
}
readerIndex(limit);
+
+ this.message = message;
}
+
+ private void changed()
+ {
+ message.bodyChanged();
+ }
public void setBuffer(HornetQBuffer buffer)
{
@@ -49,9 +63,12 @@
public void clear()
{
+ changed();
+
buffer.clear();
buffer.setIndex(limit, limit);
+
}
public void readerIndex(int readerIndex)
@@ -71,11 +88,15 @@
public void resetWriterIndex()
{
+ changed();
+
buffer.writerIndex(limit);
}
public void setIndex(int readerIndex, int writerIndex)
{
+ changed();
+
if (readerIndex < limit)
{
readerIndex = limit;
@@ -89,10 +110,261 @@
public void writerIndex(int writerIndex)
{
+ changed();
+
if (writerIndex < limit)
{
writerIndex = limit;
}
+
buffer.writerIndex(writerIndex);
}
+
+ @Override
+ public void setByte(int index, byte value)
+ {
+ changed();
+
+ super.setByte(index, value);
+ }
+
+ @Override
+ public void setBytes(int index, byte[] src, int srcIndex, int length)
+ {
+ changed();
+
+ super.setBytes(index, src, srcIndex, length);
+ }
+
+ @Override
+ public void setBytes(int index, byte[] src)
+ {
+ changed();
+
+ super.setBytes(index, src);
+ }
+
+ @Override
+ public void setBytes(int index, ByteBuffer src)
+ {
+ changed();
+
+ super.setBytes(index, src);
+ }
+
+ @Override
+ public void setBytes(int index, HornetQBuffer src, int srcIndex, int length)
+ {
+ changed();
+
+ super.setBytes(index, src, srcIndex, length);
+ }
+
+ @Override
+ public void setBytes(int index, HornetQBuffer src, int length)
+ {
+ changed();
+
+ super.setBytes(index, src, length);
+ }
+
+ @Override
+ public void setBytes(int index, HornetQBuffer src)
+ {
+ changed();
+
+ super.setBytes(index, src);
+ }
+
+ @Override
+ public void setChar(int index, char value)
+ {
+ changed();
+
+ super.setChar(index, value);
+ }
+
+ @Override
+ public void setDouble(int index, double value)
+ {
+ changed();
+
+ super.setDouble(index, value);
+ }
+
+ @Override
+ public void setFloat(int index, float value)
+ {
+ changed();
+
+ super.setFloat(index, value);
+ }
+
+ @Override
+ public void setInt(int index, int value)
+ {
+ changed();
+
+ super.setInt(index, value);
+ }
+
+ @Override
+ public void setLong(int index, long value)
+ {
+ changed();
+
+ super.setLong(index, value);
+ }
+
+ @Override
+ public void setShort(int index, short value)
+ {
+ changed();
+
+ super.setShort(index, value);
+ }
+
+ @Override
+ public void writeBoolean(boolean val)
+ {
+ changed();
+
+ super.writeBoolean(val);
+ }
+
+ @Override
+ public void writeByte(byte value)
+ {
+ changed();
+
+ super.writeByte(value);
+ }
+
+ @Override
+ public void writeBytes(byte[] src, int srcIndex, int length)
+ {
+ changed();
+
+ super.writeBytes(src, srcIndex, length);
+ }
+
+ @Override
+ public void writeBytes(byte[] src)
+ {
+ changed();
+
+ super.writeBytes(src);
+ }
+
+ @Override
+ public void writeBytes(ByteBuffer src)
+ {
+ changed();
+
+ super.writeBytes(src);
+ }
+
+ @Override
+ public void writeBytes(HornetQBuffer src, int srcIndex, int length)
+ {
+ changed();
+
+ super.writeBytes(src, srcIndex, length);
+ }
+
+ @Override
+ public void writeBytes(HornetQBuffer src, int length)
+ {
+ changed();
+
+ super.writeBytes(src, length);
+ }
+
+ @Override
+ public void writeChar(char chr)
+ {
+ changed();
+
+ super.writeChar(chr);
+ }
+
+ @Override
+ public void writeDouble(double value)
+ {
+ changed();
+
+ super.writeDouble(value);
+ }
+
+ @Override
+ public void writeFloat(float value)
+ {
+ changed();
+
+ super.writeFloat(value);
+ }
+
+ @Override
+ public void writeInt(int value)
+ {
+ changed();
+
+ super.writeInt(value);
+ }
+
+ @Override
+ public void writeLong(long value)
+ {
+ changed();
+
+ super.writeLong(value);
+ }
+
+ @Override
+ public void writeNullableSimpleString(SimpleString val)
+ {
+ changed();
+
+ super.writeNullableSimpleString(val);
+ }
+
+ @Override
+ public void writeNullableString(String val)
+ {
+ changed();
+
+ super.writeNullableString(val);
+ }
+
+ @Override
+ public void writeShort(short value)
+ {
+ changed();
+
+ super.writeShort(value);
+ }
+
+ @Override
+ public void writeSimpleString(SimpleString val)
+ {
+ changed();
+
+ super.writeSimpleString(val);
+ }
+
+ @Override
+ public void writeString(String val)
+ {
+ changed();
+
+ super.writeString(val);
+ }
+
+ @Override
+ public void writeUTF(String utf)
+ {
+ changed();
+
+ super.writeUTF(utf);
+ }
}
Modified: branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2009-11-25 00:45:52 UTC (rev 8396)
@@ -216,7 +216,7 @@
// we only force delivery once per call to receive
if (!deliveryForced)
- {
+ {
session.forceDelivery(id, forceDeliveryCount.incrementAndGet());
deliveryForced = true;
@@ -249,7 +249,7 @@
session.workDone();
if (m.containsProperty(FORCED_DELIVERY_MESSAGE))
- {
+ {
long seq = m.getLongProperty(FORCED_DELIVERY_MESSAGE);
if (seq >= forceDeliveryCount.longValue())
{
@@ -451,7 +451,7 @@
// This is ok - we just ignore the message
return;
}
-
+
ClientMessageInternal messageToHandle = message;
messageToHandle.onReceipt(this);
Modified: branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java 2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java 2009-11-25 00:45:52 UTC (rev 8396)
@@ -239,5 +239,16 @@
{
this.bodyInputStream = bodyInputStream;
}
+
+ public void setBuffer(HornetQBuffer buffer)
+ {
+ this.buffer = buffer;
+
+ if (bodyBuffer != null)
+ {
+ bodyBuffer.setBuffer(buffer);
+ }
+ }
+
}
Modified: branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java 2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java 2009-11-25 00:45:52 UTC (rev 8396)
@@ -34,13 +34,13 @@
void setFlowControlSize(int flowControlSize);
void onReceipt(ClientConsumerInternal consumer);
-
+
void setLargeMessage(boolean largeMessage);
/**
* Discard unused packets (used on large-message)
*/
- void discardLargeBody();
-
+ void discardLargeBody();
+
void setBuffer(HornetQBuffer buffer);
}
Modified: branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-11-25 00:45:52 UTC (rev 8396)
@@ -689,6 +689,8 @@
if (consumer != null)
{
ClientMessageInternal clMessage = (ClientMessageInternal)message.getMessage();
+
+ clMessage.setDeliveryCount(message.getDeliveryCount());
if (trace)
{
Modified: branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionPacketHandler.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionPacketHandler.java 2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionPacketHandler.java 2009-11-25 00:45:52 UTC (rev 8396)
@@ -68,7 +68,7 @@
break;
}
case SESS_RECEIVE_MSG:
- {
+ {
SessionReceiveMessage message = (SessionReceiveMessage) packet;
clientSession.handleReceiveMessage(message.getConsumerID(), message);
Modified: branches/20-optimisation/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java 2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java 2009-11-25 00:45:52 UTC (rev 8396)
@@ -1098,7 +1098,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.remoting.spi.HornetQBuffer#readUTF()
*/
- public String readUTF() throws Exception
+ public String readUTF()
{
return UTF8Util.readUTF(this);
}
@@ -1172,7 +1172,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.remoting.spi.HornetQBuffer#writeUTF(java.lang.String)
*/
- public void writeUTF(final String utf) throws Exception
+ public void writeUTF(final String utf)
{
throw new IllegalAccessError(READ_ONLY_ERROR_MESSAGE);
}
Modified: branches/20-optimisation/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-25 00:45:52 UTC (rev 8396)
@@ -420,7 +420,7 @@
bb.writeInt(fileId);
bb.writeLong(id);
bb.writeInt(record.getEncodeSize());
- bb.writeByte(recordType);
+ bb.writeByte(recordType);
record.encode(bb);
bb.writeInt(size);
}
Modified: branches/20-optimisation/src/main/org/hornetq/core/message/Message.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/message/Message.java 2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/core/message/Message.java 2009-11-25 00:45:52 UTC (rev 8396)
@@ -69,16 +69,27 @@
HornetQBuffer getBodyBuffer();
+
+ //Should the following methods really be on the public API?
+
void decodeFromBuffer(HornetQBuffer buffer);
HornetQBuffer encodeToBuffer();
int getEndOfMessagePosition();
+ // void setEndOfBodyPosition();
+
int getEndOfBodyPosition();
- void forceCopy();
+ void checkCopy();
+ void bodyChanged();
+
+ void resetCopied();
+
+// void resetEndOfBodyPosition();
+
// Properties
// ------------------------------------------------------------------
Modified: branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-11-25 00:45:52 UTC (rev 8396)
@@ -15,6 +15,7 @@
import static org.hornetq.utils.DataConstants.SIZE_BOOLEAN;
import static org.hornetq.utils.DataConstants.SIZE_BYTE;
+import static org.hornetq.utils.DataConstants.SIZE_INT;
import static org.hornetq.utils.DataConstants.SIZE_LONG;
import java.nio.ByteBuffer;
@@ -73,7 +74,7 @@
public static final SimpleString HDR_FROM_CLUSTER = new SimpleString("_HQ_FROM_CLUSTER");
public static final SimpleString HDR_LAST_VALUE_NAME = new SimpleString("_HQ_LVQ_NAME");
-
+
// Attributes ----------------------------------------------------
protected long messageID;
@@ -94,9 +95,9 @@
protected byte priority;
protected HornetQBuffer buffer;
-
- private int encodeSize;
-
+
+ // private int encodeSize;
+
// Constructors --------------------------------------------------
protected MessageImpl()
@@ -126,61 +127,76 @@
this.expiration = expiration;
this.timestamp = timestamp;
this.priority = priority;
- createBody(initialMessageBufferSize);
+ createBody(initialMessageBufferSize);
}
-
+
protected MessageImpl(final long messageID, final int initialMessageBufferSize)
{
this();
this.messageID = messageID;
createBody(initialMessageBufferSize);
}
-
+
private void createBody(final int initialMessageBufferSize)
{
buffer = HornetQChannelBuffers.dynamicBuffer(initialMessageBufferSize);
-
- //There's a bug in netty which means a dynamic buffer won't resize until you write a byte
+
+ // There's a bug in netty which means a dynamic buffer won't resize until you write a byte
buffer.writeByte((byte)0);
-
+
int limit = PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT;
-
+
buffer.setIndex(limit, limit);
-
- endOfBodyPosition = -1;
+
+ //endOfBodyPosition = limit;
}
// Message implementation ----------------------------------------
public int getEncodeSize()
+ {
+ int headersPropsSize = this.getHeadersAndPropertiesEncodeSize();
+
+ int bodyPos = this.endOfBodyPosition == -1 ? buffer.writerIndex() : this.endOfBodyPosition;
+
+ int bodySize = bodyPos - PacketImpl.PACKET_HEADERS_SIZE - DataConstants.SIZE_INT;
+
+ return SIZE_INT + bodySize + SIZE_INT + headersPropsSize;
+ }
+
+ public int getHeadersAndPropertiesEncodeSize()
{
- return encodeSize;
+ return SIZE_LONG + /* Destination */SimpleString.sizeofString(destination) +
+ /* Type */SIZE_BYTE +
+ /* Durable */SIZE_BOOLEAN +
+ /* Expiration */SIZE_LONG +
+ /* Timestamp */SIZE_LONG +
+ /* Priority */SIZE_BYTE +
+ /* PropertySize and Properties */properties.getEncodeSize();
}
-
+
public void encodeHeadersAndProperties(final HornetQBuffer buffer)
- {
- buffer.writeLong(messageID);
- buffer.writeSimpleString(destination);
+ {
+ buffer.writeLong(messageID);
+ buffer.writeSimpleString(destination);
buffer.writeByte(type);
buffer.writeBoolean(durable);
buffer.writeLong(expiration);
buffer.writeLong(timestamp);
buffer.writeByte(priority);
- properties.encode(buffer);
- encodeSize = buffer.writerIndex() - PacketImpl.PACKET_HEADERS_SIZE;
+ properties.encode(buffer);
}
-
+
public void decodeHeadersAndProperties(final HornetQBuffer buffer)
- {
- messageID = buffer.readLong();
- destination = buffer.readSimpleString();
+ {
+ messageID = buffer.readLong();
+ destination = buffer.readSimpleString();
type = buffer.readByte();
durable = buffer.readBoolean();
expiration = buffer.readLong();
timestamp = buffer.readLong();
priority = buffer.readByte();
- properties.decode(buffer);
- encodeSize = buffer.readerIndex() - PacketImpl.PACKET_HEADERS_SIZE;
+ properties.decode(buffer);
}
public long getMessageID()
@@ -196,6 +212,8 @@
public void setDestination(final SimpleString destination)
{
this.destination = destination;
+
+ bufferValid = false;
}
public byte getType()
@@ -211,6 +229,8 @@
public void setDurable(final boolean durable)
{
this.durable = durable;
+
+ bufferValid = false;
}
public long getExpiration()
@@ -221,6 +241,8 @@
public void setExpiration(final long expiration)
{
this.expiration = expiration;
+
+ bufferValid = false;
}
public long getTimestamp()
@@ -231,6 +253,8 @@
public void setTimestamp(final long timestamp)
{
this.timestamp = timestamp;
+
+ bufferValid = false;
}
public byte getPriority()
@@ -241,6 +265,8 @@
public void setPriority(final byte priority)
{
this.priority = priority;
+
+ bufferValid = false;
}
public boolean isExpired()
@@ -252,7 +278,7 @@
return System.currentTimeMillis() - expiration >= 0;
}
-
+
public Map<String, Object> toMap()
{
Map<String, Object> map = new HashMap<String, Object>();
@@ -277,46 +303,64 @@
public void putBooleanProperty(final SimpleString key, final boolean value)
{
properties.putBooleanProperty(key, value);
+
+ bufferValid = false;
}
public void putByteProperty(final SimpleString key, final byte value)
{
properties.putByteProperty(key, value);
+
+ bufferValid = false;
}
public void putBytesProperty(final SimpleString key, final byte[] value)
{
properties.putBytesProperty(key, value);
+
+ bufferValid = false;
}
public void putShortProperty(final SimpleString key, final short value)
{
properties.putShortProperty(key, value);
+
+ bufferValid = false;
}
public void putIntProperty(final SimpleString key, final int value)
{
properties.putIntProperty(key, value);
+
+ bufferValid = false;
}
public void putLongProperty(final SimpleString key, final long value)
{
properties.putLongProperty(key, value);
+
+ bufferValid = false;
}
public void putFloatProperty(final SimpleString key, final float value)
{
properties.putFloatProperty(key, value);
+
+ bufferValid = false;
}
public void putDoubleProperty(final SimpleString key, final double value)
{
properties.putDoubleProperty(key, value);
+
+ bufferValid = false;
}
public void putStringProperty(final SimpleString key, final SimpleString value)
{
properties.putSimpleStringProperty(key, value);
+
+ bufferValid = false;
}
public void putObjectProperty(final SimpleString key, final Object value) throws PropertyConversionException
@@ -324,10 +368,10 @@
if (value == null)
{
// This is ok - when we try to read the same key it will return null too
- return;
+
+ properties.removeProperty(key);
}
-
- if (value instanceof Boolean)
+ else if (value instanceof Boolean)
{
properties.putBooleanProperty(key, (Boolean)value);
}
@@ -363,61 +407,85 @@
{
throw new PropertyConversionException(value.getClass() + " is not a valid property type");
}
+
+ bufferValid = false;
}
public void putObjectProperty(final String key, final Object value) throws PropertyConversionException
{
putObjectProperty(new SimpleString(key), value);
+
+ bufferValid = false;
}
public void putBooleanProperty(final String key, final boolean value)
{
properties.putBooleanProperty(new SimpleString(key), value);
+
+ bufferValid = false;
}
public void putByteProperty(final String key, final byte value)
{
properties.putByteProperty(new SimpleString(key), value);
+
+ bufferValid = false;
}
public void putBytesProperty(final String key, final byte[] value)
{
properties.putBytesProperty(new SimpleString(key), value);
+
+ bufferValid = false;
}
public void putShortProperty(final String key, final short value)
{
properties.putShortProperty(new SimpleString(key), value);
+
+ bufferValid = false;
}
public void putIntProperty(final String key, final int value)
{
properties.putIntProperty(new SimpleString(key), value);
+
+ bufferValid = false;
}
public void putLongProperty(final String key, final long value)
{
properties.putLongProperty(new SimpleString(key), value);
+
+ bufferValid = false;
}
public void putFloatProperty(final String key, final float value)
{
properties.putFloatProperty(new SimpleString(key), value);
+
+ bufferValid = false;
}
public void putDoubleProperty(final String key, final double value)
{
properties.putDoubleProperty(new SimpleString(key), value);
+
+ bufferValid = false;
}
public void putStringProperty(final String key, final String value)
{
properties.putSimpleStringProperty(new SimpleString(key), new SimpleString(value));
+
+ bufferValid = false;
}
public void putTypedProperties(final TypedProperties otherProps)
{
properties.putTypedProperties(otherProps);
+
+ bufferValid = false;
}
public Object getObjectProperty(final SimpleString key)
@@ -541,11 +609,15 @@
public Object removeProperty(final SimpleString key)
{
+ bufferValid = false;
+
return properties.removeProperty(key);
}
public Object removeProperty(final String key)
{
+ bufferValid = false;
+
return properties.removeProperty(new SimpleString(key));
}
@@ -573,32 +645,22 @@
{
return buffer;
}
-
- public void setBuffer(HornetQBuffer buffer)
- {
- this.buffer = buffer;
-
- if (bodyBuffer != null)
- {
- bodyBuffer.setBuffer(buffer);
- }
- }
public BodyEncoder getBodyEncoder()
{
return new DecodingContext();
}
-
+
// Public --------------------------------------------------------
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
-
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
-
+
private class DecodingContext implements BodyEncoder
{
private int lastPos = 0;
@@ -628,151 +690,185 @@
return size;
}
}
-
- //FIXME - all this stuff only used by large messages, move it!
-
- public int getHeadersAndPropertiesEncodeSize()
- {
- return SIZE_LONG + /* Destination */SimpleString.sizeofString(destination) +
- /* Type */SIZE_BYTE +
- /* Durable */SIZE_BOOLEAN +
- /* Expiration */SIZE_LONG +
- /* Timestamp */SIZE_LONG +
- /* Priority */SIZE_BYTE +
- /* PropertySize and Properties */properties.getEncodeSize();
- }
-
-
-
-
-
- private ResetLimitWrappedHornetQBuffer bodyBuffer;
+
+ protected ResetLimitWrappedHornetQBuffer bodyBuffer;
+
public HornetQBuffer getBodyBuffer()
{
if (bodyBuffer == null)
{
if (buffer instanceof LargeMessageBuffer == false)
{
- bodyBuffer = new ResetLimitWrappedHornetQBuffer(PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT, buffer);
+ bodyBuffer = new ResetLimitWrappedHornetQBuffer(PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT,
+ buffer,
+ this);
}
else
{
return buffer;
}
}
-
+
return bodyBuffer;
}
-
-
-
+
protected boolean bufferValid;
- private int endOfBodyPosition;
-
+ private int endOfBodyPosition = -1;
+
private int endOfMessagePosition;
+
+ private boolean copied = true;
+
+ /*
+ * Copy constructor
+ */
+ protected MessageImpl(final MessageImpl other)
+ {
+ messageID = other.getMessageID();
+ destination = other.getDestination();
+ type = other.getType();
+ durable = other.isDurable();
+ expiration = other.getExpiration();
+ timestamp = other.getTimestamp();
+ priority = other.getPriority();
+ properties = new TypedProperties(other.getProperties());
+ // Note, this is a shallow copy - does not copy the buffer
+ buffer = other.buffer;
+ this.bufferValid = other.bufferValid;
+ this.endOfBodyPosition = other.endOfBodyPosition;
+ this.endOfMessagePosition = other.endOfMessagePosition;
+ this.copied = other.copied;
+ }
+
+ public void bodyChanged()
+ {
+ // If the body is changed we must copy the buffer otherwise can affect the previously sent message
+ // which might be in the Netty write queue
+ checkCopy();
+
+ bufferValid = false;
+
+ this.endOfBodyPosition = -1;
+ }
+
+ public void checkCopy()
+ {
+ if (!copied)
+ {
+ forceCopy();
+
+ copied = true;
+ }
+ }
- public void forceCopy()
+ public void resetCopied()
{
- // Must copy buffer before sending it
- int wi = buffer.writerIndex();
+ copied = false;
+ }
+
+ private void forceCopy()
+ {
+ // Must copy buffer before sending it
- this.buffer = buffer.copy(0, buffer.capacity());
+ buffer = buffer.copy(0, buffer.capacity());
- this.buffer.setIndex(0, wi);
+ buffer.setIndex(0, this.endOfBodyPosition);
+
+ if (bodyBuffer != null)
+ {
+ bodyBuffer.setBuffer(buffer);
+ }
}
-
+
public int getEndOfMessagePosition()
{
return this.endOfMessagePosition;
}
-
+
public int getEndOfBodyPosition()
{
return this.endOfBodyPosition;
}
- //Encode to journal or paging
+ // Encode to journal or paging
public void encode(HornetQBuffer buff)
{
encodeToBuffer();
buff.writeBytes(buffer, PacketImpl.PACKET_HEADERS_SIZE, endOfMessagePosition - PacketImpl.PACKET_HEADERS_SIZE);
}
-
- //Decode from journal or paging
+
+ // Decode from journal or paging
public void decode(HornetQBuffer buff)
{
int start = buff.readerIndex();
endOfBodyPosition = buff.readInt();
-
+
endOfMessagePosition = buff.getInt(endOfBodyPosition - PacketImpl.PACKET_HEADERS_SIZE + start);
-
- int endPos = endOfMessagePosition + start -
- PacketImpl.PACKET_HEADERS_SIZE;
- this.buffer.setIndex(0, PacketImpl.PACKET_HEADERS_SIZE);
+ int length = endOfMessagePosition - PacketImpl.PACKET_HEADERS_SIZE;
- buff.writeBytes(buffer, start, endPos - start);
-
- decode();
+ buffer.setIndex(0, PacketImpl.PACKET_HEADERS_SIZE);
+
+ buffer.writeBytes(buff, start, length);
+
+ decode();
+
+ buff.readerIndex(start + length);
}
- public HornetQBuffer encodeToBuffer()
- {
- log.info("encoding msg to buffer, valid " + bufferValid);
-
+ //This must be synchronized as it can be called concurrently id the message is being delivered concurently to
+ //many queues - the first caller in this case will actually encode it
+ public synchronized HornetQBuffer encodeToBuffer()
+ {
if (!bufferValid)
- {
+ {
if (endOfBodyPosition == -1)
- {
- //Means sending message for first time
- endOfBodyPosition = buffer.writerIndex();
-
- //write it
- buffer.setInt(PacketImpl.PACKET_HEADERS_SIZE, endOfBodyPosition);
-
- log.info("setting end of body pos as " + endOfBodyPosition);
+ {
+ // Means sending message for first time
+ endOfBodyPosition = buffer.writerIndex();
}
+ // write it
+ buffer.setInt(PacketImpl.PACKET_HEADERS_SIZE, endOfBodyPosition);
+
// Position at end of body and skip past the message end position int
- buffer.writerIndex(endOfBodyPosition + DataConstants.SIZE_INT);
+ buffer.setIndex(0, endOfBodyPosition + DataConstants.SIZE_INT);
encodeHeadersAndProperties(buffer);
// Write end of message position
-
+
this.endOfMessagePosition = buffer.writerIndex();
buffer.setInt(endOfBodyPosition, endOfMessagePosition);
-
+
this.bufferValid = true;
}
-
+
return buffer;
}
public void decode()
{
this.endOfBodyPosition = buffer.getInt(PacketImpl.PACKET_HEADERS_SIZE);
-
+
buffer.readerIndex(this.endOfBodyPosition + DataConstants.SIZE_INT);
this.decodeHeadersAndProperties(buffer);
-
+
this.endOfMessagePosition = buffer.readerIndex();
-
- log.info("decoded end of body pos as " + this.endOfBodyPosition);
this.bufferValid = true;
}
-
+
public void decodeFromBuffer(HornetQBuffer buffer)
{
this.buffer = buffer;
-
+
decode();
}
Modified: branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PageImpl.java 2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PageImpl.java 2009-11-25 00:45:52 UTC (rev 8396)
@@ -115,7 +115,7 @@
int oldPos = fileBuffer.readerIndex();
if (fileBuffer.readerIndex() + messageSize < fileBuffer.capacity() && fileBuffer.getByte(oldPos + messageSize) == END_BYTE)
{
- PagedMessage msg = new PagedMessageImpl();
+ PagedMessage msg = new PagedMessageImpl();
msg.decode(fileBuffer);
byte b = fileBuffer.readByte();
if (b != END_BYTE)
@@ -124,7 +124,7 @@
// constraint was already checked
throw new IllegalStateException("Internal error, it wasn't possible to locate END_BYTE " + b);
}
- messages.add(msg);
+ messages.add(msg);
}
else
{
@@ -147,15 +147,11 @@
public void write(final PagedMessage message) throws Exception
{
- log.info("encode size is " + message.getEncodeSize());
-
ByteBuffer buffer = fileFactory.newBuffer(message.getEncodeSize() + SIZE_RECORD);
HornetQBuffer wrap = HornetQChannelBuffers.wrappedBuffer(buffer);
wrap.clear();
- log.info("wrapped " + wrap.channelBuffer());
-
wrap.writeByte(START_BYTE);
wrap.writeInt(0);
int startIndex = wrap.writerIndex();
@@ -172,8 +168,6 @@
size.addAndGet(buffer.limit());
storageManager.pageWrite(message, pageId);
-
- log.info("wrote page");
}
public void sync() throws Exception
Modified: branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java 2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java 2009-11-25 00:45:52 UTC (rev 8396)
@@ -68,8 +68,7 @@
}
public PagedMessageImpl()
- {
- this(new ServerMessageImpl());
+ {
}
public ServerMessage getMessage(final StorageManager storage)
@@ -109,11 +108,10 @@
{
buffer.readInt(); // This value is only used on LargeMessages for now
- message = new ServerMessageImpl();
+ message = new ServerMessageImpl(-1, 50);
message.decode(buffer);
}
-
}
public void encode(final HornetQBuffer buffer)
Modified: branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/MessagePacket.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/MessagePacket.java 2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/MessagePacket.java 2009-11-25 00:45:52 UTC (rev 8396)
@@ -13,20 +13,20 @@
package org.hornetq.core.remoting.impl.wireformat;
-import org.hornetq.core.buffers.HornetQBuffer;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.Message;
-import org.hornetq.core.remoting.RemotingConnection;
-import org.hornetq.utils.DataConstants;
/**
* A MessagePacket
*
- * @author tim
+ * @author Tim Fox
*
*
*/
public abstract class MessagePacket extends PacketImpl
{
+ private static final Logger log = Logger.getLogger(MessagePacket.class);
+
protected Message message;
public MessagePacket(final byte type, final Message message)
@@ -41,41 +41,4 @@
return message;
}
- @Override
- public HornetQBuffer encode(final RemotingConnection connection)
- {
- HornetQBuffer buffer = message.encodeToBuffer();
-
- buffer.setIndex(0, message.getEndOfMessagePosition());
-
- encodeExtraData(buffer);
-
- size = buffer.writerIndex();
-
- //Write standard headers
-
- int len = size - DataConstants.SIZE_INT;
- buffer.setInt(0, len);
- buffer.setByte(DataConstants.SIZE_INT, type);
- buffer.setLong(DataConstants.SIZE_INT + DataConstants.SIZE_BYTE, channelID);
-
- //Position reader for reading by Netty
- buffer.readerIndex(0);
-
- return buffer;
- }
-
- @Override
- public void decodeRest(HornetQBuffer buffer)
- {
- //Buffer comes in after having read standard headers and positioned at Beginning of body part
-
- message.decodeFromBuffer(buffer);
-
- decodeExtraData(buffer);
- }
-
- protected abstract void encodeExtraData(HornetQBuffer buffer);
-
- protected abstract void decodeExtraData(HornetQBuffer buffer);
}
Modified: branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java 2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java 2009-11-25 00:45:52 UTC (rev 8396)
@@ -78,8 +78,7 @@
buffer.writeByte(journalID);
buffer.writeBoolean(isUpdate);
buffer.writeLong(id);
- buffer.writeByte(recordType);
- log.info("encode size is " + encodingData.getEncodeSize());
+ buffer.writeByte(recordType);
buffer.writeInt(encodingData.getEncodeSize());
encodingData.encode(buffer);
}
Modified: branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java 2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java 2009-11-25 00:45:52 UTC (rev 8396)
@@ -17,6 +17,7 @@
import org.hornetq.core.client.impl.ClientMessageImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.Message;
+import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.utils.DataConstants;
/**
@@ -43,7 +44,12 @@
this.deliveryCount = deliveryCount;
- message.forceCopy();
+ //If the message hasn't already been copied when the headers/properties/body was changed since last send
+ //(which will prompt an invalidate(), which will cause a copy if not copied already)
+ //Then the message needs to be copied before delivering - the previous send may be in the Netty write queue
+ //so we can't just use the same buffer. Also we can't just duplicate, since the extra data (consumerID, deliveryCount)
+ //may well be different on different calls
+ //message.forceCopy();
}
public SessionReceiveMessage()
@@ -67,31 +73,55 @@
// Protected -----------------------------------------------------
- protected void encodeExtraData(HornetQBuffer buffer)
+ @Override
+ public HornetQBuffer encode(final RemotingConnection connection)
{
+ //message.setEndOfBodyPosition();
+
+ HornetQBuffer orig = message.encodeToBuffer();
+
+ //Now we must copy this buffer, before sending to Netty, as it could be concurrently delivered to many consumers
+
+ HornetQBuffer buffer = orig.copy(0, orig.capacity());
+
+ buffer.setIndex(0, message.getEndOfMessagePosition());
+
buffer.writeLong(consumerID);
buffer.writeInt(deliveryCount);
+
+ size = buffer.writerIndex();
+
+ //Write standard headers
+
+ int len = size - DataConstants.SIZE_INT;
+ buffer.setInt(0, len);
+ buffer.setByte(DataConstants.SIZE_INT, type);
+ buffer.setLong(DataConstants.SIZE_INT + DataConstants.SIZE_BYTE, channelID);
+
+ //Position reader for reading by Netty
+ buffer.readerIndex(0);
+
+ return buffer;
}
- protected void decodeExtraData(HornetQBuffer buffer)
- {
- consumerID = buffer.readLong();
- deliveryCount = buffer.readInt();
- }
@Override
- public void decodeRest(HornetQBuffer buffer)
+ public void decode(HornetQBuffer buffer)
{
- //Buffer comes in after having read standard headers and positioned at Beginning of body part
-
+ channelID = buffer.readLong();
+
message.decodeFromBuffer(buffer);
- decodeExtraData(buffer);
+ consumerID = buffer.readLong();
+ deliveryCount = buffer.readInt();
+
+ size = buffer.readerIndex();
+
//Need to position buffer for reading
- buffer.setIndex(PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT, message.getEndOfBodyPosition());
+ buffer.setIndex(PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT, message.getEndOfBodyPosition());
}
-
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
Modified: branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java 2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java 2009-11-25 00:45:52 UTC (rev 8396)
@@ -16,7 +16,9 @@
import org.hornetq.core.buffers.HornetQBuffer;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.Message;
+import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.utils.DataConstants;
/**
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
@@ -43,7 +45,12 @@
this.requiresResponse = requiresResponse;
- message.forceCopy();
+ //If the message hasn't already been copied when the headers/properties/body was changed since last send
+ //(which will prompt an invalidate(), which will cause a copy if not copied already)
+ //Then the message needs to be copied before sending - the previous send may be in the Netty write queue
+ //so we can't just use the same buffer. Also we can't just duplicate, since the extra data (requiresResponse)
+ //may be different on different calls
+ message.checkCopy();
}
public SessionSendMessage()
@@ -62,16 +69,49 @@
// Protected -----------------------------------------------------
- protected void encodeExtraData(HornetQBuffer buffer)
+ @Override
+ public HornetQBuffer encode(final RemotingConnection connection)
{
+ //this isn't right when forwarding a message that has been already received - because writerindex will
+ //be pointing at end of message
+
+ HornetQBuffer orig = message.encodeToBuffer();
+
+ //FIXME - for now we are copying due to concurrent sends to many bridges on the server
+
+ HornetQBuffer buffer = orig.copy(0, orig.capacity());
+
+ buffer.setIndex(0, message.getEndOfMessagePosition());
+
buffer.writeBoolean(requiresResponse);
+
+ size = buffer.writerIndex();
+
+ //Write standard headers
+
+ int len = size - DataConstants.SIZE_INT;
+ buffer.setInt(0, len);
+ buffer.setByte(DataConstants.SIZE_INT, type);
+ buffer.setLong(DataConstants.SIZE_INT + DataConstants.SIZE_BYTE, channelID);
+
+ //Position reader for reading by Netty
+ buffer.readerIndex(0);
+
+ message.resetCopied();
+
+ return buffer;
}
- protected void decodeExtraData(HornetQBuffer buffer)
+ @Override
+ public void decodeRest(HornetQBuffer buffer)
{
- requiresResponse = buffer.readBoolean();
+ //Buffer comes in after having read standard headers and positioned at Beginning of body part
+
+ message.decodeFromBuffer(buffer);
+
+ requiresResponse = buffer.readBoolean();
}
-
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
Modified: branches/20-optimisation/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java 2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java 2009-11-25 00:45:52 UTC (rev 8396)
@@ -178,7 +178,7 @@
}
public void route(final ServerMessage message, final RoutingContext context)
- {
+ {
byte[] ids = message.getBytesProperty(idsHeaderName);
if (ids == null)
Modified: branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-25 00:45:52 UTC (rev 8396)
@@ -184,12 +184,13 @@
}
public HandleStatus handle(final MessageReference ref) throws Exception
- {
+ {
if (availableCredits != null && availableCredits.get() <= 0)
- {
+ {
+
return HandleStatus.BUSY;
}
-
+
lock.lock();
try
@@ -210,7 +211,7 @@
}
final ServerMessage message = ref.getMessage();
-
+
if (filter != null && !filter.match(message))
{
return HandleStatus.NO_MATCH;
@@ -344,17 +345,23 @@
{
public void run()
{
- promptDelivery(false);
+ try
+ {
+ promptDelivery(false);
- ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateUniqueID(),
- 50);
+ ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateUniqueID(), 50);
- forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
- forcedDeliveryMessage.setDestination(messageQueue.getName());
+ forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
+ forcedDeliveryMessage.setDestination(messageQueue.getName());
- final SessionReceiveMessage packet = new SessionReceiveMessage(id, forcedDeliveryMessage, 0);
+ final SessionReceiveMessage packet = new SessionReceiveMessage(id, forcedDeliveryMessage, 0);
- channel.send(packet);
+ channel.send(packet);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to send forced delivery message", e);
+ }
}
});
}
@@ -409,7 +416,8 @@
}
public void receiveCredits(final int credits) throws Exception
- {
+ {
+
if (credits == -1)
{
// No flow control
@@ -449,7 +457,7 @@
// Acknowledge acknowledges all refs delivered by the consumer up to and including the one explicitly
// acknowledged
-
+
MessageReference ref;
do
{
@@ -576,13 +584,13 @@
* @param message
*/
private void deliverStandardMessage(final MessageReference ref, final ServerMessage message)
- {
+ {
final SessionReceiveMessage packet = new SessionReceiveMessage(id, message, ref.getDeliveryCount());
-
+
channel.send(packet);
-
+
if (availableCredits != null)
- {
+ {
availableCredits.addAndGet(-packet.getPacketSize());
}
@@ -673,18 +681,19 @@
largeMessage.encodeHeadersAndProperties(headerBuffer);
SessionReceiveLargeMessage initialPacket = new SessionReceiveLargeMessage(id,
- headerBuffer.toByteBuffer().array(),
+ headerBuffer.toByteBuffer()
+ .array(),
largeMessage.getLargeBodySize(),
ref.getDeliveryCount());
context = largeMessage.getBodyEncoder();
context.open();
-
+
sentInitialPacket = true;
channel.send(initialPacket);
-
+
if (availableCredits != null)
{
availableCredits.addAndGet(-initialPacket.getPacketSize());
@@ -712,16 +721,16 @@
SessionReceiveContinuationMessage chunk = createChunkSend(context);
int chunkLen = chunk.getBody().length;
-
+
channel.send(chunk);
-
+
if (trace)
{
trace("deliverLargeMessage: Sending " + chunk.getPacketSize() +
" availableCredits now is " +
availableCredits);
}
-
+
if (availableCredits != null)
{
availableCredits.addAndGet(-chunk.getPacketSize());
@@ -847,7 +856,7 @@
{
MessageReference ref = iterator.next();
try
- {
+ {
HandleStatus status = handle(ref);
if (status == HandleStatus.BUSY)
{
Modified: branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-11-25 00:45:52 UTC (rev 8396)
@@ -17,7 +17,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.message.PropertyConversionException;
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
@@ -71,17 +70,7 @@
*/
protected ServerMessageImpl(final ServerMessageImpl other)
{
- this();
- messageID = other.getMessageID();
- destination = other.getDestination();
- type = other.getType();
- durable = other.isDurable();
- expiration = other.getExpiration();
- timestamp = other.getTimestamp();
- priority = other.getPriority();
- properties = new TypedProperties(other.getProperties());
- //Note, this is a shallow copy - does not copy the buffer
- buffer = other.buffer;
+ super(other);
}
public void setMessageID(final long id)
@@ -310,219 +299,7 @@
{
// We first set the message id - this needs to be set on the buffer since this buffer will be re-used
- //log.info(System.identityHashCode(this) + " encoded message id " + messageID + " etb " + this.encodedToBuffer);
-
buffer.setLong(buffer.getInt(PacketImpl.PACKET_HEADERS_SIZE) + DataConstants.SIZE_INT, messageID);
}
- @Override
- public void putBooleanProperty(SimpleString key, boolean value)
- {
- super.putBooleanProperty(key, value);
-
- bufferValid = false;
- }
-
- @Override
- public void putBooleanProperty(String key, boolean value)
- {
- super.putBooleanProperty(key, value);
-
- bufferValid = false;;
- }
-
- @Override
- public void putByteProperty(SimpleString key, byte value)
- {
- super.putByteProperty(key, value);
-
- bufferValid = false;
- }
-
- @Override
- public void putByteProperty(String key, byte value)
- {
- super.putByteProperty(key, value);
-
- bufferValid = false;
- }
-
- @Override
- public void putBytesProperty(SimpleString key, byte[] value)
- {
- super.putBytesProperty(key, value);
-
- bufferValid = false;
- }
-
- @Override
- public void putBytesProperty(String key, byte[] value)
- {
- super.putBytesProperty(key, value);
-
- bufferValid = false;
- }
-
- @Override
- public void putDoubleProperty(SimpleString key, double value)
- {
- super.putDoubleProperty(key, value);
-
- bufferValid = false;
- }
-
- @Override
- public void putDoubleProperty(String key, double value)
- {
- super.putDoubleProperty(key, value);
-
- bufferValid = false;
- }
-
- @Override
- public void putFloatProperty(SimpleString key, float value)
- {
- super.putFloatProperty(key, value);
-
- bufferValid = false;
- }
-
- @Override
- public void putFloatProperty(String key, float value)
- {
- super.putFloatProperty(key, value);
-
- bufferValid = false;
- }
-
- @Override
- public void putIntProperty(SimpleString key, int value)
- {
- super.putIntProperty(key, value);
-
- bufferValid = false;
- }
-
- @Override
- public void putIntProperty(String key, int value)
- {
- super.putIntProperty(key, value);
-
- bufferValid = false;
- }
-
- @Override
- public void putLongProperty(SimpleString key, long value)
- {
- super.putLongProperty(key, value);
-
- bufferValid = false;
- }
-
- @Override
- public void putLongProperty(String key, long value)
- {
- super.putLongProperty(key, value);
-
- bufferValid = false;
- }
-
- @Override
- public void putObjectProperty(SimpleString key, Object value) throws PropertyConversionException
- {
- super.putObjectProperty(key, value);
-
- bufferValid = false;
- }
-
- @Override
- public void putObjectProperty(String key, Object value) throws PropertyConversionException
- {
- super.putObjectProperty(key, value);
-
- bufferValid = false;
- }
-
- @Override
- public void putShortProperty(SimpleString key, short value)
- {
- super.putShortProperty(key, value);
-
- bufferValid = false;
- }
-
- @Override
- public void putShortProperty(String key, short value)
- {
- super.putShortProperty(key, value);
-
- bufferValid = false;
- }
-
- @Override
- public void putStringProperty(SimpleString key, SimpleString value)
- {
- super.putStringProperty(key, value);
-
- bufferValid = false;
- }
-
- @Override
- public void putStringProperty(String key, String value)
- {
- super.putStringProperty(key, value);
-
- bufferValid = false;
- }
-
- @Override
- public void putTypedProperties(TypedProperties otherProps)
- {
- super.putTypedProperties(otherProps);
-
- bufferValid = false;
- }
-
- @Override
- public void setDestination(SimpleString destination)
- {
- super.setDestination(destination);
-
- bufferValid = false;
- }
-
- @Override
- public void setDurable(boolean durable)
- {
- super.setDurable(durable);
-
- bufferValid = false;
- }
-
- @Override
- public void setExpiration(long expiration)
- {
- super.setExpiration(expiration);
-
- bufferValid = false;
- }
-
- @Override
- public void setPriority(byte priority)
- {
- super.setPriority(priority);
-
- bufferValid = false;
- }
-
- @Override
- public void setTimestamp(long timestamp)
- {
- super.setTimestamp(timestamp);
-
- bufferValid = false;
- }
-
-
-
}
Modified: branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-25 00:45:52 UTC (rev 8396)
@@ -1434,7 +1434,7 @@
public void handleSendLargeMessage(final SessionSendLargeMessage packet)
{
-
+
// need to create the LargeMessage before continue
long id = storageManager.generateUniqueID();
@@ -1454,7 +1454,7 @@
}
public void handleSend(final SessionSendMessage packet)
- {
+ {
Packet response = null;
ServerMessage message = (ServerMessage)packet.getMessage();
@@ -1574,7 +1574,7 @@
final CreditManagerHolder holder = this.getCreditManagerHolder(address);
int credits = packet.getCredits();
-
+
int gotCredits = holder.manager.acquireCredits(credits, new CreditsAvailableRunnable()
{
public boolean run(final int credits)
@@ -1594,7 +1594,7 @@
}
}
});
-
+
if (gotCredits > 0)
{
sendProducerCredits(holder, gotCredits, address);
@@ -1942,7 +1942,7 @@
private void sendProducerCredits(final CreditManagerHolder holder, final int credits, final SimpleString address)
{
holder.outstandingCredits += credits;
-
+
Packet packet = new SessionProducerCreditsMessage(credits, address, -1);
channel.send(packet);
@@ -1967,7 +1967,7 @@
}
if (tx == null || autoCommitSends)
- {
+ {
postOffice.route(msg);
}
else
Modified: branches/20-optimisation/src/main/org/hornetq/utils/UTF8Util.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/utils/UTF8Util.java 2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/src/main/org/hornetq/utils/UTF8Util.java 2009-11-25 00:45:52 UTC (rev 8396)
@@ -41,7 +41,7 @@
private static ThreadLocal<SoftReference<StringUtilBuffer>> currenBuffer = new ThreadLocal<SoftReference<StringUtilBuffer>>();
- public static void saveUTF(final HornetQBuffer out, final String str) throws IOException
+ public static void saveUTF(final HornetQBuffer out, final String str)
{
StringUtilBuffer buffer = getThreadLocalBuffer();
@@ -107,7 +107,7 @@
}
}
- public static String readUTF(final HornetQBuffer input) throws IOException
+ public static String readUTF(final HornetQBuffer input)
{
StringUtilBuffer buffer = getThreadLocalBuffer();
Modified: branches/20-optimisation/tests/jms-tests/src/org/hornetq/jms/tests/BrowserTest.java
===================================================================
--- branches/20-optimisation/tests/jms-tests/src/org/hornetq/jms/tests/BrowserTest.java 2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/tests/jms-tests/src/org/hornetq/jms/tests/BrowserTest.java 2009-11-25 00:45:52 UTC (rev 8396)
@@ -158,11 +158,13 @@
browser = session.createBrowser(queue1);
en = browser.getEnumeration();
+ log.info("browsing");
+
count = 0;
while (en.hasMoreElements())
{
Message mess = (Message)en.nextElement();
- log.trace("message:" + mess);
+ log.info("message:" + mess);
count++;
}
Modified: branches/20-optimisation/tests/jms-tests/src/org/hornetq/jms/tests/TransactedSessionTest.java
===================================================================
--- branches/20-optimisation/tests/jms-tests/src/org/hornetq/jms/tests/TransactedSessionTest.java 2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/tests/jms-tests/src/org/hornetq/jms/tests/TransactedSessionTest.java 2009-11-25 00:45:52 UTC (rev 8396)
@@ -37,7 +37,6 @@
// Public --------------------------------------------------------
-
public void testSimpleRollback() throws Exception
{
// send a message
@@ -45,44 +44,44 @@
try
{
- conn = cf.createConnection();
- Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- s.createProducer(queue1).send(s.createTextMessage("one"));
+ conn = cf.createConnection();
+ Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ s.createProducer(queue1).send(s.createTextMessage("one"));
- s.close();
+ s.close();
- s = conn.createSession(true, Session.SESSION_TRANSACTED);
- MessageConsumer c = s.createConsumer(queue1);
- conn.start();
- Message m = c.receive(1000);
- assertNotNull(m);
+ s = conn.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer c = s.createConsumer(queue1);
+ conn.start();
+ Message m = c.receive(1000);
+ assertNotNull(m);
- assertEquals("one", ((TextMessage)m).getText());
- assertFalse(m.getJMSRedelivered());
- assertEquals(1, m.getIntProperty("JMSXDeliveryCount"));
+ assertEquals("one", ((TextMessage)m).getText());
+ assertFalse(m.getJMSRedelivered());
+ assertEquals(1, m.getIntProperty("JMSXDeliveryCount"));
- s.rollback();
+ s.rollback();
- // get the message again
- m = c.receive(1000);
- assertNotNull(m);
-
- assertTrue(m.getJMSRedelivered());
- assertEquals(2, m.getIntProperty("JMSXDeliveryCount"));
+ // get the message again
+ m = c.receive(1000);
+ assertNotNull(m);
- conn.close();
+ assertTrue(m.getJMSRedelivered());
+ assertEquals(2, m.getIntProperty("JMSXDeliveryCount"));
- Integer i = getMessageCountForQueue("Queue1");
+ conn.close();
+ Integer i = getMessageCountForQueue("Queue1");
+
assertEquals(1, i.intValue());
}
finally
{
- if (conn != null)
- {
- conn.close();
- }
- removeAllMessages(queue1.getQueueName(), true);
+ if (conn != null)
+ {
+ conn.close();
+ }
+ removeAllMessages(queue1.getQueueName(), true);
}
}
@@ -107,11 +106,11 @@
TextMessage mRec1 = (TextMessage)consumer1.receive(2000);
assertNotNull(mRec1);
-
+
assertEquals("igloo", mRec1.getText());
assertFalse(mRec1.getJMSRedelivered());
- sess1.rollback(); //causes redelivery for session
+ sess1.rollback(); // causes redelivery for session
mRec1 = (TextMessage)consumer1.receive(2000);
assertEquals("igloo", mRec1.getText());
@@ -128,7 +127,6 @@
}
}
-
/** Test redelivery works ok for Topic */
public void testRedeliveredTopic() throws Exception
{
@@ -187,8 +185,10 @@
MessageConsumer consumer = sess.createConsumer(topic1);
conn.start();
+ log.info("sending message first time");
TextMessage mSent = sess.createTextMessage("igloo");
producer.send(mSent);
+ log.info("sent message first time");
sess.commit();
@@ -197,12 +197,15 @@
sess.commit();
+ log.info("sending message again");
mSent.setText("rollback");
producer.send(mSent);
+ log.info("sent message again");
sess.commit();
mRec = (TextMessage)consumer.receive(2000);
+ assertEquals("rollback", mRec.getText());
sess.rollback();
TextMessage mRec2 = (TextMessage)consumer.receive(2000);
@@ -243,7 +246,7 @@
final int NUM_MESSAGES = 10;
- //Send some messages
+ // Send some messages
for (int i = 0; i < NUM_MESSAGES; i++)
{
Message m = producerSess.createMessage();
@@ -283,7 +286,7 @@
final int NUM_MESSAGES = 10;
- //Send some messages
+ // Send some messages
for (int i = 0; i < NUM_MESSAGES; i++)
{
Message m = producerSess.createMessage();
@@ -296,7 +299,8 @@
while (true)
{
Message m = consumer.receive(500);
- if (m == null) break;
+ if (m == null)
+ break;
count++;
}
@@ -335,7 +339,7 @@
final int NUM_MESSAGES = 10;
- //Send some messages
+ // Send some messages
for (int i = 0; i < NUM_MESSAGES; i++)
{
Message m = producerSess.createMessage();
@@ -345,7 +349,8 @@
while (true)
{
Message m = consumer.receive(500);
- if (m == null) break;
+ if (m == null)
+ break;
count++;
}
@@ -378,8 +383,6 @@
}
-
-
/*
* Send some messages in a transacted session.
* Rollback the session.
@@ -402,7 +405,7 @@
final int NUM_MESSAGES = 10;
- //Send some messages
+ // Send some messages
for (int i = 0; i < NUM_MESSAGES; i++)
{
Message m = producerSess.createMessage();
@@ -503,7 +506,7 @@
assertEquals(1, tm.getIntProperty("JMSXDeliveryCount"));
sess.rollback();
-
+
sess.close();
Session sess2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -537,7 +540,6 @@
MessageConsumer consumer = sess.createConsumer(queue1);
conn.start();
-
TextMessage mSent = sess.createTextMessage("igloo");
producer.send(mSent);
log.trace("sent1");
@@ -560,7 +562,7 @@
log.trace("Receiving 2");
mRec = (TextMessage)consumer.receive(1000);
assertNotNull(mRec);
-
+
log.trace("Received 2");
assertNotNull(mRec);
assertEquals("rollback", mRec.getText());
@@ -599,7 +601,7 @@
final int NUM_MESSAGES = 10;
- //Send some messages
+ // Send some messages
for (int i = 0; i < NUM_MESSAGES; i++)
{
Message m = producerSess.createMessage();
@@ -617,9 +619,6 @@
}
}
-
-
-
/**
* Send some messages in transacted session. Commit.
* Verify message are received by consumer.
@@ -641,7 +640,7 @@
final int NUM_MESSAGES = 10;
- //Send some messages
+ // Send some messages
for (int i = 0; i < NUM_MESSAGES; i++)
{
Message m = producerSess.createMessage();
@@ -654,7 +653,8 @@
while (true)
{
Message m = consumer.receive(500);
- if (m == null) break;
+ if (m == null)
+ break;
count++;
}
@@ -671,7 +671,6 @@
}
-
/**
* Test IllegateStateException is thrown if commit is called on a non-transacted session
*/
@@ -706,7 +705,6 @@
}
}
-
/**
* Send some messages.
* Receive them in a transacted session.
@@ -733,7 +731,7 @@
final int NUM_MESSAGES = 10;
- //Send some messages
+ // Send some messages
for (int i = 0; i < NUM_MESSAGES; i++)
{
Message m = producerSess.createMessage();
@@ -744,7 +742,8 @@
while (true)
{
Message m = consumer.receive(500);
- if (m == null) break;
+ if (m == null)
+ break;
count++;
}
@@ -752,9 +751,9 @@
conn.stop();
consumer.close();
-
+
conn.close();
-
+
conn = cf.createConnection();
consumerSess = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE);
@@ -762,15 +761,15 @@
conn.start();
count = 0;
-
while (true)
{
Message m = consumer.receive(500);
- if (m == null) break;
+ if (m == null)
+ break;
count++;
}
-
+
assertEquals(NUM_MESSAGES, count);
}
finally
@@ -783,9 +782,6 @@
}
}
-
-
-
/**
* Send some messages.
* Receive them in a transacted session.
@@ -810,7 +806,7 @@
final int NUM_MESSAGES = 10;
- //Send some messages
+ // Send some messages
for (int i = 0; i < NUM_MESSAGES; i++)
{
Message m = producerSess.createMessage();
@@ -821,7 +817,8 @@
while (true)
{
Message m = consumer.receive(500);
- if (m == null) break;
+ if (m == null)
+ break;
count++;
}
@@ -854,8 +851,6 @@
}
-
-
/*
* Send some messages in a transacted session.
* Rollback the session.
@@ -878,7 +873,7 @@
final int NUM_MESSAGES = 10;
- //Send some messages
+ // Send some messages
for (int i = 0; i < NUM_MESSAGES; i++)
{
Message m = producerSess.createMessage();
@@ -900,7 +895,6 @@
}
}
-
/*
* Test IllegateStateException is thrown if rollback is
* called on a non-transacted session
@@ -938,7 +932,6 @@
}
}
-
/*
* Send some messages.
* Receive them in a transacted session.
@@ -965,7 +958,7 @@
final int NUM_MESSAGES = 10;
- //Send some messages
+ // Send some messages
for (int i = 0; i < NUM_MESSAGES; i++)
{
Message m = producerSess.createMessage();
@@ -976,7 +969,8 @@
while (true)
{
Message m = consumer.receive(500);
- if (m == null) break;
+ if (m == null)
+ break;
count++;
}
@@ -999,7 +993,8 @@
while (true)
{
Message m = consumer.receive(500);
- if (m == null) break;
+ if (m == null)
+ break;
count++;
}
@@ -1017,7 +1012,6 @@
}
-
/*
* Send multiple messages in multiple contiguous sessions
*/
@@ -1039,7 +1033,7 @@
final int NUM_MESSAGES = 10;
final int NUM_TX = 10;
- //Send some messages
+ // Send some messages
for (int j = 0; j < NUM_TX; j++)
{
@@ -1056,7 +1050,8 @@
while (true)
{
Message m = consumer.receive(500);
- if (m == null) break;
+ if (m == null)
+ break;
count++;
m.acknowledge();
}
@@ -1080,5 +1075,3 @@
// Inner classes -------------------------------------------------
}
-
-
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/AckBatchSizeTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/AckBatchSizeTest.java 2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/AckBatchSizeTest.java 2009-11-25 00:45:52 UTC (rev 8396)
@@ -78,21 +78,18 @@
cp.send(sendSession.createClientMessage(false));
}
- log.info("sent messages");
-
ClientConsumer consumer = session.createConsumer(queueA);
session.start();
for (int i = 0; i < numMessages - 1; i++)
{
ClientMessage m = consumer.receive(5000);
- log.info("got message " + i);
m.acknowledge();
}
ClientMessage m = consumer.receive(5000);
Queue q = (Queue)server.getPostOffice().getBinding(queueA).getBindable();
- assertEquals(1, q.getDeliveringCount());
+ assertEquals(100, q.getDeliveringCount());
m.acknowledge();
assertEquals(0, q.getDeliveringCount());
sendSession.close();
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java 2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java 2009-11-25 00:45:52 UTC (rev 8396)
@@ -105,7 +105,7 @@
log.info("sent messages");
ClientConsumer consumer = session.createConsumer(QUEUE);
-
+
session.start();
for (int i = 0; i < numMessages; i++)
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/DeadLetterAddressTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/DeadLetterAddressTest.java 2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/DeadLetterAddressTest.java 2009-11-25 00:45:52 UTC (rev 8396)
@@ -26,6 +26,7 @@
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.server.HornetQ;
import org.hornetq.core.server.HornetQServer;
@@ -39,6 +40,8 @@
*/
public class DeadLetterAddressTest extends UnitTestCase
{
+ private static final Logger log = Logger.getLogger(DeadLetterAddressTest.class);
+
private HornetQServer server;
private ClientSession clientSession;
@@ -236,7 +239,9 @@
for (int i = 0; i < deliveryAttempt; i++)
{
ClientMessage m = clientConsumer.receive(500);
- assertNotNull(m);
+ assertNotNull(m);
+ log.info("i is " + i);
+ log.info("delivery cout is " +m.getDeliveryCount());
assertEquals(i + 1, m.getDeliveryCount());
m.acknowledge();
clientSession.rollback();
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/InVMNonPersistentMessageBufferTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/InVMNonPersistentMessageBufferTest.java 2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/InVMNonPersistentMessageBufferTest.java 2009-11-25 00:45:52 UTC (rev 8396)
@@ -85,12 +85,8 @@
int bodySize = message.getBodySize();
- log.info("body size is " + bodySize);
-
for (int i = 0; i < 10; i++)
{
- log.info("sending " + i);
-
ClientMessage received = sendAndReceive(message);
assertNotNull(received);
@@ -107,10 +103,12 @@
{
ClientMessage message = session.createClientMessage(false);
+ String body = RandomUtil.randomString();
+
for (int i = 0; i < 10; i++)
{
- log.info("iteration " + i);
- final String body = RandomUtil.randomString();
+ //Make the body a bit longer each time
+ body += "XX";
message.getBodyBuffer().writeString(body);
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2009-11-25 00:45:52 UTC (rev 8396)
@@ -23,7 +23,6 @@
import junit.framework.TestSuite;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
import org.hornetq.core.client.ClientConsumer;
import org.hornetq.core.client.ClientMessage;
import org.hornetq.core.client.ClientProducer;
@@ -41,7 +40,6 @@
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.tests.integration.largemessage.LargeMessageTestBase;
import org.hornetq.tests.util.RandomUtil;
-import org.hornetq.utils.DataConstants;
import org.hornetq.utils.SimpleString;
/**
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java 2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java 2009-11-25 00:45:52 UTC (rev 8396)
@@ -64,8 +64,6 @@
startServers();
- log.info("********** started servers");
-
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());
@@ -110,8 +108,6 @@
this.closeAllSessionFactories();
- log.info("** stopping servers");
-
stopServers(0, 1, 2, 3, 4);
startServers();
@@ -167,33 +163,24 @@
startServers();
- log.info("*** started servers");
-
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());
setupSessionFactory(3, isNetty());
setupSessionFactory(4, isNetty());
- log.info("** created session factories");
-
- createQueue(0, "queues.testaddress", "queue0", null, false);
-
+ createQueue(0, "queues.testaddress", "queue0", null, false);
createQueue(1, "queues.testaddress", "queue0", null, false);
createQueue(2, "queues.testaddress", "queue0", null, false);
createQueue(3, "queues.testaddress", "queue0", null, false);
createQueue(4, "queues.testaddress", "queue0", null, false);
- log.info("**** created queues");
-
addConsumer(0, 0, "queue0", null);
addConsumer(1, 1, "queue0", null);
addConsumer(2, 2, "queue0", null);
addConsumer(3, 3, "queue0", null);
addConsumer(4, 4, "queue0", null);
- log.info("*** created consumers");
-
waitForBindings(0, "queues.testaddress", 1, 1, true);
waitForBindings(1, "queues.testaddress", 1, 1, true);
waitForBindings(2, "queues.testaddress", 1, 1, true);
@@ -206,14 +193,8 @@
waitForBindings(3, "queues.testaddress", 4, 4, false);
waitForBindings(4, "queues.testaddress", 4, 4, false);
- log.info("** sending messages");
-
send(0, "queues.testaddress", 10, false, null);
- log.info("** sent messages");
-
- // this.checkReceive(0, 1, 2, 3, 4);
-
verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2, 3, 4);
this.verifyNotReceive(0, 1, 2, 3, 4);
@@ -1181,6 +1162,10 @@
waitForBindings(2, "queues.testaddress", 4, 4, false);
waitForBindings(3, "queues.testaddress", 4, 4, false);
waitForBindings(4, "queues.testaddress", 4, 4, false);
+
+ // this.checkReceive(0, 1, 2, 3, 4);
+
+ //Thread.sleep(300000);
verifyReceiveAll(10, 0, 1, 2, 3, 4);
}
@@ -1436,12 +1421,9 @@
closeSessionFactory(3);
stopServers(0, 3);
- log.info("stopped servers");
startServers(3, 0);
-
- log.info("restarted servers");
-
+
Thread.sleep(2000);
setupSessionFactory(0, isNetty());
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTest.java 2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTest.java 2009-11-25 00:45:52 UTC (rev 8396)
@@ -17,6 +17,7 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQ;
@@ -30,6 +31,8 @@
*/
public class MultiThreadRandomReattachTest extends MultiThreadRandomReattachTestBase
{
+ private static final Logger log = Logger.getLogger(MultiThreadRandomReattachTest.class);
+
@Override
protected void start() throws Exception
{
@@ -47,6 +50,8 @@
@Override
protected void setBody(final ClientMessage message) throws Exception
{
+ //Give each msg a body
+ message.getBodyBuffer().writeBytes(new byte[500]);
}
/* (non-Javadoc)
@@ -55,7 +60,7 @@
@Override
protected boolean checkSize(final ClientMessage message)
{
- return 0 == message.getBodyBuffer().writerIndex();
+ return message.getBodyBuffer().readableBytes() == 500;
}
}
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/performance/persistence/StorageManagerTimingTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/performance/persistence/StorageManagerTimingTest.java 2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/performance/persistence/StorageManagerTimingTest.java 2009-11-25 00:45:52 UTC (rev 8396)
@@ -167,12 +167,12 @@
final SimpleString address = new SimpleString("Destination " + i);
- ServerMessageImpl implMsg = new ServerMessageImpl();
+ ServerMessageImpl implMsg = new ServerMessageImpl(i, 1000);
implMsg.putStringProperty(new SimpleString("Key"), new SimpleString("This String is worthless!"));
implMsg.setMessageID(i);
- implMsg.setBuffer(HornetQChannelBuffers.wrappedBuffer(bytes));
+ implMsg.getBodyBuffer().writeBytes(bytes);
implMsg.setDestination(address);
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java 2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java 2009-11-25 00:45:52 UTC (rev 8396)
@@ -25,10 +25,12 @@
import org.hornetq.core.paging.impl.PageImpl;
import org.hornetq.core.paging.impl.PagedMessageImpl;
import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
+import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.DataConstants;
import org.hornetq.utils.SimpleString;
/**
@@ -207,9 +209,9 @@
for (int i = 0; i < numberOfElements; i++)
{
- ServerMessage msg = new ServerMessageImpl(i, 1000);
+ ServerMessage msg = new ServerMessageImpl(i, 100);
- for (int j = 0; j < msg.getBodyBuffer().capacity(); j++)
+ for (int j = 0; j < 10; j++)
{
msg.getBodyBuffer().writeByte((byte)'b');
}
@@ -225,8 +227,6 @@
return buffers;
}
-
-
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -238,5 +238,4 @@
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
-
}
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2009-11-24 19:05:15 UTC (rev 8395)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2009-11-25 00:45:52 UTC (rev 8396)
@@ -286,6 +286,18 @@
class FakeMessage implements ServerMessage
{
+ public void decode(HornetQBuffer buffer)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void encode(HornetQBuffer buffer)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
public ServerMessage copy() throws Exception
{
// TODO Auto-generated method stub
@@ -322,12 +334,6 @@
}
- public int getEndMessagePosition()
- {
- // TODO Auto-generated method stub
- return 0;
- }
-
public int getMemoryEstimate()
{
// TODO Auto-generated method stub
@@ -376,43 +382,43 @@
return false;
}
- public void setEndMessagePosition(int pos)
+ public void setMessageID(long id)
{
// TODO Auto-generated method stub
}
- public void setMessageID(long id)
+ public void setOriginalHeaders(ServerMessage other, boolean expiry)
{
// TODO Auto-generated method stub
}
- public void setNeedsEncoding()
+ public void setPagingStore(PagingStore store)
{
// TODO Auto-generated method stub
}
- public void setOriginalHeaders(ServerMessage other, boolean expiry)
+ public boolean storeIsPaging()
{
// TODO Auto-generated method stub
-
+ return false;
}
- public void setPagingStore(PagingStore store)
+ public void bodyChanged()
{
// TODO Auto-generated method stub
}
- public boolean storeIsPaging()
+ public void checkCopy()
{
// TODO Auto-generated method stub
- return false;
+
}
- public void afterSend()
+ public void clearCopied()
{
// TODO Auto-generated method stub
@@ -430,7 +436,7 @@
return false;
}
- public void decodeFromWire(HornetQBuffer buffer)
+ public void decodeFromBuffer(HornetQBuffer buffer)
{
// TODO Auto-generated method stub
@@ -448,6 +454,12 @@
}
+ public HornetQBuffer encodeToBuffer()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
public HornetQBuffer getBodyBuffer()
{
// TODO Auto-generated method stub
@@ -526,6 +538,18 @@
return 0;
}
+ public int getEndOfBodyPosition()
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ public int getEndOfMessagePosition()
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
public long getExpiration()
{
// TODO Auto-generated method stub
@@ -670,24 +694,12 @@
return null;
}
- public boolean isBufferWritten()
- {
- // TODO Auto-generated method stub
- return false;
- }
-
public boolean isDurable()
{
// TODO Auto-generated method stub
return false;
}
- public boolean isEncodedToBuffer()
- {
- // TODO Auto-generated method stub
- return false;
- }
-
public boolean isExpired()
{
// TODO Auto-generated method stub
@@ -838,12 +850,6 @@
return null;
}
- public void setBuffer(HornetQBuffer buffer)
- {
- // TODO Auto-generated method stub
-
- }
-
public void setDestination(SimpleString destination)
{
// TODO Auto-generated method stub
@@ -880,91 +886,20 @@
return null;
}
- public void decode(HornetQBuffer buffer)
+ public void resetCopied()
{
// TODO Auto-generated method stub
}
- public void encode(HornetQBuffer buffer)
+ public void setEndOfBodyPosition()
{
// TODO Auto-generated method stub
}
- public void setEncodedToBuffer(boolean encoded)
- {
- // TODO Auto-generated method stub
-
- }
- public void decodeFromBuffer(HornetQBuffer buffer)
- {
- // TODO Auto-generated method stub
-
- }
- public HornetQBuffer encodeToBuffer()
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- public int getBodySize()
- {
- // TODO Auto-generated method stub
- return 0;
- }
-
- public int getEndOfMessagePosition()
- {
- // TODO Auto-generated method stub
- return 0;
- }
-
- public void saveToOutputStream(OutputStream out) throws HornetQException
- {
- // TODO Auto-generated method stub
-
- }
-
- public void setBodyInputStream(InputStream bodyInputStream)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void setOutputStream(OutputStream out) throws HornetQException
- {
- // TODO Auto-generated method stub
-
- }
-
- public boolean waitOutputStreamCompletion(long timeMilliseconds) throws HornetQException
- {
- // TODO Auto-generated method stub
- return false;
- }
-
- public void beforeDeliver()
- {
- // TODO Auto-generated method stub
-
- }
-
- public void beforeSend()
- {
- // TODO Auto-generated method stub
-
- }
-
- public void forceCopy()
- {
- // TODO Auto-generated method stub
-
- }
-
-
}
class FakeFilter implements Filter
15 years, 1 month
JBoss hornetq SVN: r8395 - trunk/native/bin.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-24 14:05:15 -0500 (Tue, 24 Nov 2009)
New Revision: 8395
Modified:
trunk/native/bin/libHornetQAIO_ia64.so
Log:
ia64 bits compilation
Modified: trunk/native/bin/libHornetQAIO_ia64.so
===================================================================
(Binary files differ)
15 years, 1 month
JBoss hornetq SVN: r8394 - in trunk/src/main/org/hornetq/core: remoting/impl and 2 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-24 13:08:58 -0500 (Tue, 24 Nov 2009)
New Revision: 8394
Modified:
trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
Tweaks and Making sure GroupingFailoverTest will not fail
Modified: trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2009-11-24 15:35:18 UTC (rev 8393)
+++ trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2009-11-24 18:08:58 UTC (rev 8394)
@@ -134,6 +134,16 @@
{
super.close();
+ if (maxIOSemaphore != null)
+ {
+ while (!maxIOSemaphore.tryAcquire(maxIO, 60, TimeUnit.SECONDS))
+ {
+ log.warn("Couldn't get lock after 60 seconds on closing AsynchronousFileImpl::" + this.getFileName());
+ }
+ }
+
+ maxIOSemaphore = null;
+
if (channel != null)
{
channel.close();
@@ -148,16 +158,6 @@
rfile = null;
- if (maxIOSemaphore != null)
- {
- while (!maxIOSemaphore.tryAcquire(maxIO, 60, TimeUnit.SECONDS))
- {
- log.warn("Couldn't get lock after 60 seconds on closing AsynchronousFileImpl::" + this.getFileName());
- }
- }
-
- maxIOSemaphore = null;
-
notifyAll();
}
Modified: trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java 2009-11-24 15:35:18 UTC (rev 8393)
+++ trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java 2009-11-24 18:08:58 UTC (rev 8394)
@@ -243,8 +243,6 @@
log.warn("Connection failure has been detected: " + me.getMessage() + " [code=" + me.getCode() + "]");
- System.out.println("Fail on RemotingConnectio");
-
// Then call the listeners
callFailureListeners(me);
@@ -401,7 +399,6 @@
for (final FailureListener listener : listenersClone)
{
- System.out.println("Calling failure listener: " + listener.getClass().getName());
try
{
listener.connectionFailed(me);
Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-11-24 15:35:18 UTC (rev 8393)
+++ trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-11-24 18:08:58 UTC (rev 8394)
@@ -371,7 +371,14 @@
while (!pendingTokens.isEmpty())
{
OperationContext ctx = pendingTokens.poll();
- ctx.replicationDone();
+ try
+ {
+ ctx.replicationDone();
+ }
+ catch (Throwable e)
+ {
+ log.warn("Error completing callback on replication manager", e);
+ }
}
if (replicatingChannel != null)
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-11-24 15:35:18 UTC (rev 8393)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-11-24 18:08:58 UTC (rev 8394)
@@ -381,6 +381,12 @@
storageManager.stop();
}
+ if (replicationManager != null)
+ {
+ replicationManager.stop();
+ replicationManager = null;
+ }
+
if (replicationEndpoint != null)
{
replicationEndpoint.stop();
@@ -417,12 +423,6 @@
log.debug("Waiting for " + task);
}
- if (replicationManager != null)
- {
- replicationManager.stop();
- replicationManager = null;
- }
-
threadPool.shutdown();
scheduledPool = null;
15 years, 1 month
JBoss hornetq SVN: r8393 - trunk/src/main/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-24 10:35:18 -0500 (Tue, 24 Nov 2009)
New Revision: 8393
Modified:
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
Log:
just a tweak
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-24 15:07:34 UTC (rev 8392)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-24 15:35:18 UTC (rev 8393)
@@ -858,7 +858,6 @@
appendAddRecord(id, recordType, record, sync, callback);
- // We only wait on explicit callbacks
if (callback != null)
{
callback.waitCompletion();
@@ -925,7 +924,6 @@
appendUpdateRecord(id, recordType, record, sync, callback);
- // We only wait on explicit callbacks
if (callback != null)
{
callback.waitCompletion();
@@ -1003,7 +1001,6 @@
appendDeleteRecord(id, sync, callback);
- // We only wait on explicit callbacks
if (callback != null)
{
callback.waitCompletion();
15 years, 1 month
JBoss hornetq SVN: r8392 - trunk/src/main/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-24 10:07:34 -0500 (Tue, 24 Nov 2009)
New Revision: 8392
Modified:
trunk/src/main/org/hornetq/core/journal/impl/TransactionCallback.java
Log:
Fixing race on blocking commits & fixing hanging tests
Modified: trunk/src/main/org/hornetq/core/journal/impl/TransactionCallback.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TransactionCallback.java 2009-11-24 03:50:27 UTC (rev 8391)
+++ trunk/src/main/org/hornetq/core/journal/impl/TransactionCallback.java 2009-11-24 15:07:34 UTC (rev 8392)
@@ -49,8 +49,11 @@
countLatch.down();
if (++done == up && delegateCompletion != null)
{
- delegateCompletion.done();
+ final IOAsyncTask delegateToCall = delegateCompletion;
+ // We need to set the delegateCompletion to null first or blocking commits could miss a callback
+ // What would affect mainly tests
delegateCompletion = null;
+ delegateToCall.done();
}
}
15 years, 1 month