Author: timfox
Date: 2009-11-25 06:07:36 -0500 (Wed, 25 Nov 2009)
New Revision: 8401
Added:
branches/20-optimisation/src/main/org/hornetq/core/buffers/HornetQBuffers.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/jms/client/ReSendLargeMessageTest.java
Removed:
branches/20-optimisation/src/main/org/hornetq/core/buffers/HornetQChannelBuffers.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/jms/client/ResendTest.java
Modified:
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
branches/20-optimisation/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
branches/20-optimisation/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
branches/20-optimisation/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
branches/20-optimisation/src/main/org/hornetq/core/journal/impl/JournalImpl.java
branches/20-optimisation/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java
branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PageImpl.java
branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java
branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/20-optimisation/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java
branches/20-optimisation/src/main/org/hornetq/core/server/ServerMessage.java
branches/20-optimisation/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/20-optimisation/src/main/org/hornetq/core/server/cluster/impl/BroadcastGroupImpl.java
branches/20-optimisation/src/main/org/hornetq/core/server/impl/DivertImpl.java
branches/20-optimisation/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
branches/20-optimisation/src/main/org/hornetq/jms/client/HornetQMessage.java
branches/20-optimisation/src/main/org/hornetq/jms/client/HornetQStreamMessage.java
branches/20-optimisation/src/main/org/hornetq/utils/TypedProperties.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/PagingTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/management/ManagementHelperTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/management/ManagementServiceImplTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/performance/persistence/StorageManagerTimingTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/stress/journal/CompactingStressTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/stress/journal/LargeJournalStressTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/stress/journal/MultiThreadConsumerStressTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/stress/journal/NIOMultiThreadCompactorStressTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/timing/util/UTF8Test.java
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/filter/impl/FilterTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/ChannelBufferWrapper2Test.java
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/unit/util/TypedPropertiesTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/unit/util/UTF8Test.java
branches/20-optimisation/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
optimisation
Copied: branches/20-optimisation/src/main/org/hornetq/core/buffers/HornetQBuffers.java
(from rev 8387,
branches/20-optimisation/src/main/org/hornetq/core/buffers/HornetQChannelBuffers.java)
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/buffers/HornetQBuffers.java
(rev 0)
+++
branches/20-optimisation/src/main/org/hornetq/core/buffers/HornetQBuffers.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.buffers;
+
+import java.nio.ByteBuffer;
+
+import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+
+/**
+ *
+ * A HornetQChannelBuffers
+ *
+ * @author tim
+ *
+ *
+ */
+public class HornetQBuffers
+{
+ public static HornetQBuffer dynamicBuffer(int estimatedLength)
+ {
+ return new ChannelBufferWrapper(ChannelBuffers.dynamicBuffer(estimatedLength));
+ }
+
+ public static HornetQBuffer dynamicBuffer(byte[] bytes)
+ {
+ ChannelBuffer buff = ChannelBuffers.dynamicBuffer(bytes.length);
+
+ buff.writeBytes(bytes);
+
+ return new ChannelBufferWrapper(buff);
+ }
+
+ public static HornetQBuffer wrappedBuffer(ByteBuffer underlying)
+ {
+ HornetQBuffer buff = new
ChannelBufferWrapper(ChannelBuffers.wrappedBuffer(underlying));
+
+ buff.clear();
+
+ return buff;
+ }
+
+ public static HornetQBuffer wrappedBuffer(byte[] underlying)
+ {
+ return new ChannelBufferWrapper(ChannelBuffers.wrappedBuffer(underlying));
+ }
+
+ public static HornetQBuffer fixedBuffer(int size)
+ {
+ return new ChannelBufferWrapper(ChannelBuffers.buffer(size));
+ }
+}
Deleted:
branches/20-optimisation/src/main/org/hornetq/core/buffers/HornetQChannelBuffers.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/buffers/HornetQChannelBuffers.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/src/main/org/hornetq/core/buffers/HornetQChannelBuffers.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -1,61 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.buffers;
-
-import java.nio.ByteBuffer;
-
-import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-
-
-/**
- *
- * A HornetQChannelBuffers
- *
- * @author tim
- *
- *
- */
-public class HornetQChannelBuffers
-{
- public static HornetQBuffer dynamicBuffer(int estimatedLength)
- {
- return new ChannelBufferWrapper(ChannelBuffers.dynamicBuffer(estimatedLength));
- }
-
- public static HornetQBuffer dynamicBuffer(byte[] bytes)
- {
- ChannelBuffer buff = ChannelBuffers.dynamicBuffer(bytes.length);
-
- buff.writeBytes(bytes);
-
- return new ChannelBufferWrapper(buff);
- }
-
- public static HornetQBuffer wrappedBuffer(ByteBuffer underlying)
- {
- return new ChannelBufferWrapper(ChannelBuffers.wrappedBuffer(underlying));
- }
-
- public static HornetQBuffer wrappedBuffer(byte[] underlying)
- {
- return new ChannelBufferWrapper(ChannelBuffers.wrappedBuffer(underlying));
- }
-
- public static HornetQBuffer fixedBuffer(int size)
- {
- return new ChannelBufferWrapper(ChannelBuffers.buffer(size));
- }
-}
Modified:
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -17,7 +17,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.client.ClientMessage;
import org.hornetq.core.client.MessageHandler;
import org.hornetq.core.exception.HornetQException;
@@ -491,7 +491,7 @@
//FIXME - this is really inefficient - decoding from a buffer to a byte[] then from
the byte[] to another buffer
//which is then decoded to form the message! Clebert, what were you thinking?
-
currentChunkMessage.decodeHeadersAndProperties(HornetQChannelBuffers.wrappedBuffer(packet.getLargeMessageHeader()));
+
currentChunkMessage.decodeHeadersAndProperties(HornetQBuffers.wrappedBuffer(packet.getLargeMessageHeader()));
currentChunkMessage.setLargeMessage(true);
Modified:
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -19,7 +19,7 @@
import java.io.InputStream;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.BodyEncoder;
@@ -308,7 +308,7 @@
msg.getWholeBuffer().readerIndex(0);
}
- HornetQBuffer headerBuffer = HornetQChannelBuffers.fixedBuffer(headerSize);
+ HornetQBuffer headerBuffer = HornetQBuffers.fixedBuffer(headerSize);
msg.encodeHeadersAndProperties(headerBuffer);
@@ -359,7 +359,7 @@
final int chunkLength = Math.min((int)(bodySize - pos),
minLargeMessageSize);
- final HornetQBuffer bodyBuffer =
HornetQChannelBuffers.fixedBuffer(chunkLength);
+ final HornetQBuffer bodyBuffer = HornetQBuffers.fixedBuffer(chunkLength);
context.encode(bodyBuffer, chunkLength);
Modified:
branches/20-optimisation/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -24,7 +24,7 @@
import java.util.Map;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.cluster.DiscoveryEntry;
import org.hornetq.core.cluster.DiscoveryGroup;
import org.hornetq.core.cluster.DiscoveryListener;
@@ -328,7 +328,7 @@
}
}
- HornetQBuffer buffer = HornetQChannelBuffers.wrappedBuffer(data);
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(data);
String originatingNodeID = buffer.readString();
Modified:
branches/20-optimisation/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -19,7 +19,7 @@
import java.util.Set;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.logging.Logger;
@@ -97,12 +97,12 @@
{
controlFile.open(1);
- HornetQBuffer renameBuffer = HornetQChannelBuffers.dynamicBuffer(1);
+ HornetQBuffer renameBuffer = HornetQBuffers.dynamicBuffer(1);
renameBuffer.writeInt(-1);
renameBuffer.writeInt(-1);
- HornetQBuffer filesToRename = HornetQChannelBuffers.dynamicBuffer(1);
+ HornetQBuffer filesToRename = HornetQBuffers.dynamicBuffer(1);
// DataFiles first
@@ -205,8 +205,8 @@
flush();
ByteBuffer bufferWrite = fileFactory.newBuffer(journal.getFileSize());
- writingChannel = HornetQChannelBuffers.wrappedBuffer(bufferWrite);
-
+ writingChannel = HornetQBuffers.wrappedBuffer(bufferWrite);
+
currentFile = journal.getFile(false, false, false, true);
sequentialFile = currentFile.getFile();
Modified:
branches/20-optimisation/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -22,7 +22,7 @@
import java.util.concurrent.ConcurrentHashMap;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.journal.RecordInfo;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
@@ -83,7 +83,7 @@
}
else
{
- HornetQBuffer input =
HornetQChannelBuffers.wrappedBuffer(records.get(0).data);
+ HornetQBuffer input = HornetQBuffers.wrappedBuffer(records.get(0).data);
int numberDataFiles = input.readInt();
Modified:
branches/20-optimisation/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -42,7 +42,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.journal.JournalLoadInformation;
@@ -3331,7 +3331,7 @@
private HornetQBuffer newBuffer(final int size)
{
- return HornetQChannelBuffers.fixedBuffer(size);
+ return HornetQBuffers.fixedBuffer(size);
}
// Inner classes
Modified:
branches/20-optimisation/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -22,7 +22,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.logging.Logger;
import org.hornetq.utils.VariableLatch;
@@ -104,7 +104,7 @@
}
// Setting the interval for nano-sleeps
- buffer = HornetQChannelBuffers.fixedBuffer(bufferSize);
+ buffer = HornetQBuffers.fixedBuffer(bufferSize);
buffer.clear();
bufferLimit = 0;
Modified:
branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -24,7 +24,7 @@
import java.util.Set;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.buffers.impl.ResetLimitWrappedHornetQBuffer;
import org.hornetq.core.client.LargeMessageBuffer;
import org.hornetq.core.exception.HornetQException;
@@ -96,8 +96,6 @@
protected HornetQBuffer buffer;
- // private int encodeSize;
-
// Constructors --------------------------------------------------
protected MessageImpl()
@@ -139,7 +137,7 @@
private void createBody(final int initialMessageBufferSize)
{
- buffer = HornetQChannelBuffers.dynamicBuffer(initialMessageBufferSize);
+ buffer = HornetQBuffers.dynamicBuffer(initialMessageBufferSize);
// There's a bug in netty which means a dynamic buffer won't resize until
you write a byte
buffer.writeByte((byte)0);
@@ -148,17 +146,17 @@
buffer.setIndex(limit, limit);
- //endOfBodyPosition = limit;
+ // endOfBodyPosition = limit;
}
// Message implementation ----------------------------------------
public int getEncodeSize()
- {
- int headersPropsSize = this.getHeadersAndPropertiesEncodeSize();
-
+ {
+ int headersPropsSize = this.getHeadersAndPropertiesEncodeSize();
+
int bodyPos = this.endOfBodyPosition == -1 ? buffer.writerIndex() :
this.endOfBodyPosition;
-
+
int bodySize = bodyPos - PacketImpl.PACKET_HEADERS_SIZE - DataConstants.SIZE_INT;
return SIZE_INT + bodySize + SIZE_INT + headersPropsSize;
@@ -176,27 +174,27 @@
}
public void encodeHeadersAndProperties(final HornetQBuffer buffer)
- {
+ {
buffer.writeLong(messageID);
- buffer.writeSimpleString(destination);
+ buffer.writeSimpleString(destination);
buffer.writeByte(type);
buffer.writeBoolean(durable);
buffer.writeLong(expiration);
buffer.writeLong(timestamp);
- buffer.writeByte(priority);
+ buffer.writeByte(priority);
properties.encode(buffer);
}
public void decodeHeadersAndProperties(final HornetQBuffer buffer)
{
- messageID = buffer.readLong();
- destination = buffer.readSimpleString();
+ messageID = buffer.readLong();
+ destination = buffer.readSimpleString();
type = buffer.readByte();
durable = buffer.readBoolean();
expiration = buffer.readLong();
timestamp = buffer.readLong();
- priority = buffer.readByte();
- properties.decode(buffer);
+ priority = buffer.readByte();
+ properties.decode(buffer);
}
public long getMessageID()
@@ -679,7 +677,7 @@
public int encode(ByteBuffer bufferRead) throws HornetQException
{
- HornetQBuffer buffer = HornetQChannelBuffers.wrappedBuffer(bufferRead);
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bufferRead);
return encode(buffer, bufferRead.capacity());
}
@@ -691,7 +689,6 @@
}
}
-
protected ResetLimitWrappedHornetQBuffer bodyBuffer;
public HornetQBuffer getBodyBuffer()
@@ -724,8 +721,8 @@
/*
* Copy constructor
*/
- protected MessageImpl(final MessageImpl other)
- {
+ protected MessageImpl(final MessageImpl other, final boolean shallow)
+ {
messageID = other.getMessageID();
destination = other.getDestination();
type = other.getType();
@@ -734,12 +731,23 @@
timestamp = other.getTimestamp();
priority = other.getPriority();
properties = new TypedProperties(other.getProperties());
- // Note, this is a shallow copy - does not copy the buffer
- buffer = other.buffer;
+
this.bufferValid = other.bufferValid;
this.endOfBodyPosition = other.endOfBodyPosition;
this.endOfMessagePosition = other.endOfMessagePosition;
this.copied = other.copied;
+
+ if (shallow)
+ {
+ this.buffer = other.buffer;
+ }
+ else
+ {
+ // We need to copy the underlying buffer too, since the different messsages
thereafter might have different
+ // properties set on them, making their encoding different
+ buffer = other.buffer.copy(0, other.buffer.capacity());
+ buffer.setIndex(other.buffer.readerIndex(), other.buffer.writerIndex());
+ }
}
public void bodyChanged()
@@ -749,7 +757,7 @@
checkCopy();
bufferValid = false;
-
+
this.endOfBodyPosition = -1;
}
@@ -762,7 +770,7 @@
copied = true;
}
}
-
+
public void resetCopied()
{
copied = false;
@@ -771,17 +779,17 @@
private void forceCopy()
{
// Must copy buffer before sending it
-
+
buffer = buffer.copy(0, buffer.capacity());
-
+
buffer.setIndex(0, this.endOfBodyPosition);
-
+
if (bodyBuffer != null)
{
bodyBuffer.setBuffer(buffer);
}
}
-
+
public int getEndOfMessagePosition()
{
return this.endOfMessagePosition;
@@ -791,12 +799,12 @@
{
return this.endOfBodyPosition;
}
-
+
// Encode to journal or paging
public void encode(HornetQBuffer buff)
{
encodeToBuffer();
-
+
buff.writeBytes(buffer, PacketImpl.PACKET_HEADERS_SIZE, endOfMessagePosition -
PacketImpl.PACKET_HEADERS_SIZE);
}
@@ -820,18 +828,18 @@
buff.readerIndex(start + length);
}
- //This must be synchronized as it can be called concurrently id the message is being
delivered concurently to
- //many queues - the first caller in this case will actually encode it
+ // This must be synchronized as it can be called concurrently id the message is being
delivered concurently to
+ // many queues - the first caller in this case will actually encode it
public synchronized HornetQBuffer encodeToBuffer()
- {
+ {
if (!bufferValid)
- {
+ {
if (endOfBodyPosition == -1)
{
// Means sending message for first time
- endOfBodyPosition = buffer.writerIndex();
+ endOfBodyPosition = buffer.writerIndex();
}
-
+
// write it
buffer.setInt(PacketImpl.PACKET_HEADERS_SIZE, endOfBodyPosition);
@@ -848,7 +856,7 @@
this.bufferValid = true;
}
-
+
return buffer;
}
Modified: branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PageImpl.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PageImpl.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -22,7 +22,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.logging.Logger;
@@ -98,7 +98,7 @@
buffer2.rewind();
- HornetQBuffer fileBuffer = HornetQChannelBuffers.wrappedBuffer(buffer2);
+ HornetQBuffer fileBuffer = HornetQBuffers.wrappedBuffer(buffer2);
fileBuffer.writerIndex(fileBuffer.capacity());
while (fileBuffer.readable())
@@ -149,7 +149,7 @@
{
ByteBuffer buffer = fileFactory.newBuffer(message.getEncodeSize() + SIZE_RECORD);
- HornetQBuffer wrap = HornetQChannelBuffers.wrappedBuffer(buffer);
+ HornetQBuffer wrap = HornetQBuffers.wrappedBuffer(buffer);
wrap.clear();
wrap.writeByte(START_BYTE);
Modified:
branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -18,7 +18,7 @@
import static org.hornetq.utils.DataConstants.SIZE_LONG;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.persistence.StorageManager;
@@ -76,7 +76,7 @@
if (largeMessageLazyData != null)
{
message = storage.createLargeMessage();
- HornetQBuffer buffer =
HornetQChannelBuffers.dynamicBuffer(largeMessageLazyData);
+ HornetQBuffer buffer = HornetQBuffers.dynamicBuffer(largeMessageLazyData);
message.decodeHeadersAndProperties(buffer);
largeMessageLazyData = null;
}
Modified:
branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -33,7 +33,7 @@
import javax.transaction.xa.Xid;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.filter.Filter;
@@ -432,7 +432,7 @@
LargeServerMessageImpl largeMessage =
(LargeServerMessageImpl)createLargeMessage();
- HornetQBuffer headerBuffer = HornetQChannelBuffers.wrappedBuffer(header);
+ HornetQBuffer headerBuffer = HornetQBuffers.wrappedBuffer(header);
largeMessage.decodeHeadersAndProperties(headerBuffer);
@@ -680,7 +680,7 @@
{
byte[] data = record.data;
- HornetQBuffer buff = HornetQChannelBuffers.wrappedBuffer(data);
+ HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
try
{
@@ -721,7 +721,7 @@
{
byte[] data = record.data;
- HornetQBuffer buff = HornetQChannelBuffers.wrappedBuffer(data);
+ HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
byte recordType = record.getUserRecordType();
@@ -1008,7 +1008,7 @@
{
byte[] data = record.data;
- HornetQBuffer buff = HornetQChannelBuffers.wrappedBuffer(data);
+ HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
byte recordType = record.getUserRecordType();
@@ -1140,7 +1140,7 @@
{
byte[] data = record.data;
- HornetQBuffer buff = HornetQChannelBuffers.wrappedBuffer(data);
+ HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
long messageID = record.id;
@@ -1224,7 +1224,7 @@
{
long id = record.id;
- HornetQBuffer buffer = HornetQChannelBuffers.wrappedBuffer(record.data);
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(record.data);
byte rec = record.getUserRecordType();
@@ -1441,7 +1441,7 @@
XidEncoding(final byte[] data)
{
- xid = XidCodecSupport.decodeXid(HornetQChannelBuffers.wrappedBuffer(data));
+ xid = XidCodecSupport.decodeXid(HornetQBuffers.wrappedBuffer(data));
}
public void decode(final HornetQBuffer buffer)
Modified:
branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -14,7 +14,7 @@
package org.hornetq.core.persistence.impl.nullpm;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.impl.ServerMessageImpl;
Modified:
branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -20,7 +20,7 @@
import javax.transaction.xa.Xid;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
@@ -196,7 +196,7 @@
{
NullStorageLargeServerMessage largeMessage = new NullStorageLargeServerMessage();
- HornetQBuffer headerBuffer = HornetQChannelBuffers.wrappedBuffer(header);
+ HornetQBuffer headerBuffer = HornetQBuffers.wrappedBuffer(header);
largeMessage.decodeHeadersAndProperties(headerBuffer);
Modified:
branches/20-optimisation/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -25,7 +25,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.client.management.impl.ManagementHelper;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.filter.Filter;
Modified:
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -16,7 +16,7 @@
import java.util.concurrent.RejectedExecutionException;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.spi.BufferHandler;
@@ -97,7 +97,7 @@
public HornetQBuffer createBuffer(final int size)
{
- return HornetQChannelBuffers.dynamicBuffer(size);
+ return HornetQBuffers.dynamicBuffer(size);
}
public Object getID()
Modified:
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -43,13 +43,6 @@
this.consumerID = consumerID;
this.deliveryCount = deliveryCount;
-
- //If the message hasn't already been copied when the headers/properties/body
was changed since last send
- //(which will prompt an invalidate(), which will cause a copy if not copied
already)
- //Then the message needs to be copied before delivering - the previous send may be
in the Netty write queue
- //so we can't just use the same buffer. Also we can't just duplicate, since
the extra data (consumerID, deliveryCount)
- //may well be different on different calls
- //message.forceCopy();
}
public SessionReceiveMessage()
@@ -76,8 +69,6 @@
@Override
public HornetQBuffer encode(final RemotingConnection connection)
{
- //message.setEndOfBodyPosition();
-
HornetQBuffer orig = message.encodeToBuffer();
//Now we must copy this buffer, before sending to Netty, as it could be
concurrently delivered to many consumers
@@ -85,7 +76,7 @@
HornetQBuffer buffer = orig.copy(0, orig.capacity());
buffer.setIndex(0, message.getEndOfMessagePosition());
-
+
buffer.writeLong(consumerID);
buffer.writeInt(deliveryCount);
Modified: branches/20-optimisation/src/main/org/hornetq/core/server/ServerMessage.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/server/ServerMessage.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/src/main/org/hornetq/core/server/ServerMessage.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -39,9 +39,11 @@
int decrementDurableRefCount();
- ServerMessage copy(long newID) throws Exception;
+ ServerMessage copy(long newID);
- ServerMessage copy() throws Exception;
+ ServerMessage copy();
+
+ ServerMessage shallowCopy();
int getMemoryEstimate();
Modified:
branches/20-optimisation/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -416,7 +416,7 @@
// the one pertinent for the destination node - this is important since
different queues on different
// nodes could have same queue ids
// Note we must copy since same message may get routed to other nodes which
require different headers
- message = message.copy();
+ message = message.shallowCopy();
// TODO - we can optimise this
Modified:
branches/20-optimisation/src/main/org/hornetq/core/server/cluster/impl/BroadcastGroupImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/server/cluster/impl/BroadcastGroupImpl.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/src/main/org/hornetq/core/server/cluster/impl/BroadcastGroupImpl.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -21,7 +21,7 @@
import java.util.concurrent.ScheduledFuture;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.management.Notification;
@@ -203,7 +203,7 @@
return;
}
- HornetQBuffer buff = HornetQChannelBuffers.dynamicBuffer(4096);
+ HornetQBuffer buff = HornetQBuffers.dynamicBuffer(4096);
buff.writeString(nodeID);
Modified: branches/20-optimisation/src/main/org/hornetq/core/server/impl/DivertImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/server/impl/DivertImpl.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/src/main/org/hornetq/core/server/impl/DivertImpl.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -83,6 +83,8 @@
// TODO we can optimise this so it doesn't copy if it's not routed anywhere
else
+ log.info("making copy for divert");
+
ServerMessage copy = message.copy();
copy.setDestination(forwardAddress);
Modified:
branches/20-optimisation/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -31,7 +31,7 @@
import javax.management.MBeanServer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.client.ClientSessionFactory;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.client.impl.FailoverManager;
Modified:
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -22,7 +22,7 @@
import java.util.concurrent.locks.ReentrantLock;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.client.impl.ClientConsumerImpl;
import org.hornetq.core.client.management.impl.ManagementHelper;
import org.hornetq.core.exception.HornetQException;
@@ -676,7 +676,7 @@
if (!sentInitialPacket)
{
- HornetQBuffer headerBuffer =
HornetQChannelBuffers.fixedBuffer(largeMessage.getHeadersAndPropertiesEncodeSize());
+ HornetQBuffer headerBuffer =
HornetQBuffers.fixedBuffer(largeMessage.getHeadersAndPropertiesEncodeSize());
largeMessage.encodeHeadersAndProperties(headerBuffer);
@@ -805,7 +805,7 @@
localChunkLen = (int)Math.min(sizePendingLargeMessage -
positionPendingLargeMessage, minLargeMessageSize);
- HornetQBuffer bodyBuffer = HornetQChannelBuffers.fixedBuffer(localChunkLen);
+ HornetQBuffer bodyBuffer = HornetQBuffers.fixedBuffer(localChunkLen);
context.encode(bodyBuffer, localChunkLen);
Modified:
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -68,9 +68,9 @@
/*
* Copy constructor
*/
- protected ServerMessageImpl(final ServerMessageImpl other)
+ protected ServerMessageImpl(final ServerMessageImpl other, final boolean shallow)
{
- super(other);
+ super(other, shallow);
}
public void setMessageID(final long id)
@@ -162,19 +162,24 @@
return memoryEstimate;
}
- public ServerMessage copy(final long newID) throws Exception
+ public ServerMessage copy(final long newID)
{
- ServerMessage m = new ServerMessageImpl(this);
+ ServerMessage m = new ServerMessageImpl(this, false);
m.setMessageID(newID);
return m;
}
- public ServerMessage copy() throws Exception
+ public ServerMessage copy()
{
- return new ServerMessageImpl(this);
+ return new ServerMessageImpl(this, false);
}
+
+ public ServerMessage shallowCopy()
+ {
+ return new ServerMessageImpl(this, true);
+ }
public ServerMessage makeCopyForExpiryOrDLA(final long newID, final boolean expiry)
throws Exception
{
Modified: branches/20-optimisation/src/main/org/hornetq/jms/client/HornetQMessage.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/jms/client/HornetQMessage.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/src/main/org/hornetq/jms/client/HornetQMessage.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -34,7 +34,7 @@
import javax.jms.MessageNotWriteableException;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.client.ClientMessage;
import org.hornetq.core.client.ClientSession;
import org.hornetq.core.client.impl.ClientMessageImpl;
Modified:
branches/20-optimisation/src/main/org/hornetq/jms/client/HornetQStreamMessage.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/jms/client/HornetQStreamMessage.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/src/main/org/hornetq/jms/client/HornetQStreamMessage.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -19,7 +19,7 @@
import javax.jms.StreamMessage;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.client.ClientMessage;
import org.hornetq.core.client.ClientSession;
import org.hornetq.core.client.impl.ClientMessageImpl;
Modified: branches/20-optimisation/src/main/org/hornetq/utils/TypedProperties.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/utils/TypedProperties.java 2009-11-25
03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/src/main/org/hornetq/utils/TypedProperties.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -454,7 +454,7 @@
for (int i = 0; i < numHeaders; i++)
{
- int len = buffer.readInt();
+ int len = buffer.readInt();
byte[] data = new byte[len];
buffer.readBytes(data);
SimpleString key = new SimpleString(data);
Modified:
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -20,7 +20,7 @@
import junit.framework.AssertionFailedError;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.client.ClientConsumer;
import org.hornetq.core.client.ClientMessage;
import org.hornetq.core.client.ClientProducer;
Modified:
branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -20,7 +20,7 @@
import java.util.List;
import java.util.Map;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.client.ClientConsumer;
import org.hornetq.core.client.ClientMessage;
import org.hornetq.core.client.ClientProducer;
Modified:
branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -17,7 +17,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.client.ClientConsumer;
import org.hornetq.core.client.ClientMessage;
import org.hornetq.core.client.ClientProducer;
Copied:
branches/20-optimisation/tests/src/org/hornetq/tests/integration/jms/client/ReSendLargeMessageTest.java
(from rev 8344,
branches/20-optimisation/tests/src/org/hornetq/tests/integration/jms/client/ResendTest.java)
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/integration/jms/client/ReSendLargeMessageTest.java
(rev 0)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/integration/jms/client/ReSendLargeMessageTest.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -0,0 +1,311 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.jms.client;
+
+import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP;
+import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE;
+import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND;
+import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND;
+import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
+import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
+import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL;
+import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
+import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
+import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRY_INTERVAL;
+import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
+import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
+import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONFIRMATION_WINDOW_SIZE;
+import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_WINDOW_SIZE;
+import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
+import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_THREAD_POOL_MAX_SIZE;
+import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_USE_GLOBAL_POOLS;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.jms.client.HornetQMessage;
+import org.hornetq.tests.util.JMSTestBase;
+import org.hornetq.utils.Pair;
+
+/**
+ * Receive Messages and resend them, like the bridge would do
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReSendLargeMessageTest extends JMSTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private Queue queue;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testResendMessage() throws Exception
+ {
+ Connection conn = cf.createConnection();
+ try
+ {
+ conn.start();
+
+ Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
+ ArrayList<Message> msgs = new ArrayList<Message>();
+
+ for (int i = 0; i < 10; i++)
+ {
+ BytesMessage bm = sess.createBytesMessage();
+ bm.setObjectProperty(HornetQMessage.JMS_HORNETQ_INPUT_STREAM,
+ createFakeLargeStream(2 *
ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE));
+ msgs.add(bm);
+
+ MapMessage mm = sess.createMapMessage();
+ mm.setBoolean("boolean", true);
+ mm.setByte("byte", (byte)3);
+ mm.setBytes("bytes", new byte[] { (byte)3, (byte)4, (byte)5 });
+ mm.setChar("char", (char)6);
+ mm.setDouble("double", 7.0);
+ mm.setFloat("float", 8.0f);
+ mm.setInt("int", 9);
+ mm.setLong("long", 10l);
+ mm.setObject("object", new String("this is an object"));
+ mm.setShort("short", (short)11);
+ mm.setString("string", "this is a string");
+
+ msgs.add(mm);
+ msgs.add(sess.createTextMessage("hello" + i));
+ msgs.add(sess.createObjectMessage(new SomeSerializable("hello" +
i)));
+ }
+
+ internalTestResend(msgs, sess);
+
+ }
+ finally
+ {
+ conn.close();
+ }
+
+ }
+
+ public void internalTestResend(ArrayList<Message> msgs, Session sess) throws
Exception
+ {
+ MessageProducer prod = sess.createProducer(queue);
+
+ for (Message msg : msgs)
+ {
+ prod.send(msg);
+ }
+
+ sess.commit();
+
+ MessageConsumer cons = sess.createConsumer(queue);
+
+ for (int i = 0; i < msgs.size(); i++)
+ {
+ Message msg = cons.receive(5000);
+ assertNotNull(msg);
+
+ prod.send(msg);
+ }
+
+ assertNull(cons.receiveNoWait());
+
+ sess.commit();
+
+ for (Message originalMessage : msgs)
+ {
+ Message copiedMessage = cons.receive(5000);
+ assertNotNull(copiedMessage);
+
+ assertEquals(copiedMessage.getClass(), originalMessage.getClass());
+
+ sess.commit();
+
+ if (copiedMessage instanceof BytesMessage)
+ {
+ BytesMessage copiedBytes = (BytesMessage)copiedMessage;
+
+ for (int i = 0; i < copiedBytes.getBodyLength(); i++)
+ {
+ assertEquals(getSamplebyte(i), copiedBytes.readByte());
+ }
+ }
+ else if (copiedMessage instanceof MapMessage)
+ {
+ MapMessage copiedMap = (MapMessage)copiedMessage;
+ MapMessage originalMap = (MapMessage)originalMessage;
+ assertEquals(originalMap.getString("str"),
copiedMap.getString("str"));
+ assertEquals(originalMap.getLong("long"),
copiedMap.getLong("long"));
+ assertEquals(originalMap.getInt("int"),
copiedMap.getInt("int"));
+ assertEquals(originalMap.getObject("object"),
copiedMap.getObject("object"));
+ }
+ else if (copiedMessage instanceof ObjectMessage)
+ {
+ assertNotSame(((ObjectMessage)originalMessage).getObject(),
((ObjectMessage)copiedMessage).getObject());
+ assertEquals(((ObjectMessage)originalMessage).getObject(),
((ObjectMessage)copiedMessage).getObject());
+ }
+ else if (copiedMessage instanceof TextMessage)
+ {
+ assertEquals(((TextMessage)originalMessage).getText(),
((TextMessage)copiedMessage).getText());
+ }
+ }
+
+ }
+
+ public static class SomeSerializable implements Serializable
+ {
+ /**
+ *
+ */
+ private static final long serialVersionUID = -8576054940441747312L;
+
+ final String txt;
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#hashCode()
+ */
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (txt == null ? 0 : txt.hashCode());
+ return result;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ @Override
+ public boolean equals(final Object obj)
+ {
+ if (this == obj)
+ {
+ return true;
+ }
+ if (obj == null)
+ {
+ return false;
+ }
+ if (getClass() != obj.getClass())
+ {
+ return false;
+ }
+ SomeSerializable other = (SomeSerializable)obj;
+ if (txt == null)
+ {
+ if (other.txt != null)
+ {
+ return false;
+ }
+ }
+ else if (!txt.equals(other.txt))
+ {
+ return false;
+ }
+ return true;
+ }
+
+ SomeSerializable(final String txt)
+ {
+ this.txt = txt;
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+ protected void createCF(List<Pair<TransportConfiguration,
TransportConfiguration>> connectorConfigs,
+ List<String> jndiBindings) throws Exception
+ {
+ int retryInterval = 1000;
+ double retryIntervalMultiplier = 1.0;
+ int reconnectAttempts = -1;
+ boolean failoverOnServerShutdown = true;
+ int callTimeout = 30000;
+
+
jmsServer.createConnectionFactory("ManualReconnectionToSingleServerTest",
+ connectorConfigs,
+ null,
+ DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ DEFAULT_CONNECTION_TTL,
+ callTimeout,
+ true,
+ DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ DEFAULT_CONSUMER_WINDOW_SIZE,
+ DEFAULT_CONSUMER_MAX_RATE,
+ DEFAULT_CONFIRMATION_WINDOW_SIZE,
+ DEFAULT_PRODUCER_WINDOW_SIZE,
+ DEFAULT_PRODUCER_MAX_RATE,
+ DEFAULT_BLOCK_ON_ACKNOWLEDGE,
+ DEFAULT_BLOCK_ON_PERSISTENT_SEND,
+ DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
+ DEFAULT_AUTO_GROUP,
+ DEFAULT_PRE_ACKNOWLEDGE,
+
DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
+ DEFAULT_ACK_BATCH_SIZE,
+ DEFAULT_ACK_BATCH_SIZE,
+ DEFAULT_USE_GLOBAL_POOLS,
+ DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
+ DEFAULT_THREAD_POOL_MAX_SIZE,
+ retryInterval,
+ retryIntervalMultiplier,
+ DEFAULT_MAX_RETRY_INTERVAL,
+ reconnectAttempts,
+ failoverOnServerShutdown,
+ jndiBindings);
+ }
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ queue = createQueue("queue1");
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ queue = null;
+ super.tearDown();
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Deleted:
branches/20-optimisation/tests/src/org/hornetq/tests/integration/jms/client/ResendTest.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/integration/jms/client/ResendTest.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/integration/jms/client/ResendTest.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -1,312 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.integration.jms.client;
-
-import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP;
-import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE;
-import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND;
-import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND;
-import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
-import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
-import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL;
-import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
-import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
-import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRY_INTERVAL;
-import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
-import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
-import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
-import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONFIRMATION_WINDOW_SIZE;
-import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_WINDOW_SIZE;
-import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
-import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_THREAD_POOL_MAX_SIZE;
-import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_USE_GLOBAL_POOLS;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
-import org.hornetq.core.config.TransportConfiguration;
-import org.hornetq.jms.client.HornetQMessage;
-import org.hornetq.tests.util.JMSTestBase;
-import org.hornetq.utils.Pair;
-
-/**
- * Receive Messages and resend them, like the bridge would do
- *
- * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class ResendTest extends JMSTestBase
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private Queue queue;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public void testResendMessage() throws Exception
- {
- Connection conn = cf.createConnection();
- try
- {
- conn.start();
-
- Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
- ArrayList<Message> msgs = new ArrayList<Message>();
-
- for (int i = 0; i < 10; i++)
- {
- BytesMessage bm = sess.createBytesMessage();
- bm.setObjectProperty(HornetQMessage.JMS_HORNETQ_INPUT_STREAM,
- createFakeLargeStream(2 *
ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE));
- msgs.add(bm);
-
- MapMessage mm = sess.createMapMessage();
- mm.setBoolean("boolean", true);
- mm.setByte("byte", (byte)3);
- mm.setBytes("bytes", new byte[] { (byte)3, (byte)4, (byte)5 });
- mm.setChar("char", (char)6);
- mm.setDouble("double", 7.0);
- mm.setFloat("float", 8.0f);
- mm.setInt("int", 9);
- mm.setLong("long", 10l);
- mm.setObject("object", new String("this is an object"));
- mm.setShort("short", (short)11);
- mm.setString("string", "this is a string");
-
- msgs.add(mm);
- msgs.add(sess.createTextMessage("hello" + i));
- msgs.add(sess.createObjectMessage(new SomeSerializable("hello" +
i)));
- }
-
- internalTestResend(msgs, sess);
-
- }
- finally
- {
- conn.close();
- }
-
- }
-
- public void internalTestResend(ArrayList<Message> msgs, Session sess) throws
Exception
- {
-
- MessageProducer prod = sess.createProducer(queue);
-
- for (Message msg : msgs)
- {
- prod.send(msg);
- }
-
- sess.commit();
-
- MessageConsumer cons = sess.createConsumer(queue);
-
- for (int i = 0; i < msgs.size(); i++)
- {
- Message msg = cons.receive(5000);
- assertNotNull(msg);
-
- prod.send(msg);
- }
-
- assertNull(cons.receiveNoWait());
-
- sess.commit();
-
- for (Message originalMessage : msgs)
- {
- Message copiedMessage = cons.receive(5000);
- assertNotNull(copiedMessage);
-
- assertEquals(copiedMessage.getClass(), originalMessage.getClass());
-
- sess.commit();
-
- if (copiedMessage instanceof BytesMessage)
- {
- BytesMessage copiedBytes = (BytesMessage)copiedMessage;
-
- for (int i = 0; i < copiedBytes.getBodyLength(); i++)
- {
- assertEquals(getSamplebyte(i), copiedBytes.readByte());
- }
- }
- else if (copiedMessage instanceof MapMessage)
- {
- MapMessage copiedMap = (MapMessage)copiedMessage;
- MapMessage originalMap = (MapMessage)originalMessage;
- assertEquals(originalMap.getString("str"),
copiedMap.getString("str"));
- assertEquals(originalMap.getLong("long"),
copiedMap.getLong("long"));
- assertEquals(originalMap.getInt("int"),
copiedMap.getInt("int"));
- assertEquals(originalMap.getObject("object"),
copiedMap.getObject("object"));
- }
- else if (copiedMessage instanceof ObjectMessage)
- {
- assertNotSame(((ObjectMessage)originalMessage).getObject(),
((ObjectMessage)copiedMessage).getObject());
- assertEquals(((ObjectMessage)originalMessage).getObject(),
((ObjectMessage)copiedMessage).getObject());
- }
- else if (copiedMessage instanceof TextMessage)
- {
- assertEquals(((TextMessage)originalMessage).getText(),
((TextMessage)copiedMessage).getText());
- }
- }
-
- }
-
- public static class SomeSerializable implements Serializable
- {
- /**
- *
- */
- private static final long serialVersionUID = -8576054940441747312L;
-
- final String txt;
-
- /* (non-Javadoc)
- * @see java.lang.Object#hashCode()
- */
- @Override
- public int hashCode()
- {
- final int prime = 31;
- int result = 1;
- result = prime * result + (txt == null ? 0 : txt.hashCode());
- return result;
- }
-
- /* (non-Javadoc)
- * @see java.lang.Object#equals(java.lang.Object)
- */
- @Override
- public boolean equals(final Object obj)
- {
- if (this == obj)
- {
- return true;
- }
- if (obj == null)
- {
- return false;
- }
- if (getClass() != obj.getClass())
- {
- return false;
- }
- SomeSerializable other = (SomeSerializable)obj;
- if (txt == null)
- {
- if (other.txt != null)
- {
- return false;
- }
- }
- else if (!txt.equals(other.txt))
- {
- return false;
- }
- return true;
- }
-
- SomeSerializable(final String txt)
- {
- this.txt = txt;
- }
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
- protected void createCF(List<Pair<TransportConfiguration,
TransportConfiguration>> connectorConfigs,
- List<String> jndiBindings) throws Exception
- {
- int retryInterval = 1000;
- double retryIntervalMultiplier = 1.0;
- int reconnectAttempts = -1;
- boolean failoverOnServerShutdown = true;
- int callTimeout = 30000;
-
-
jmsServer.createConnectionFactory("ManualReconnectionToSingleServerTest",
- connectorConfigs,
- null,
- DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
- DEFAULT_CONNECTION_TTL,
- callTimeout,
- true,
- DEFAULT_MIN_LARGE_MESSAGE_SIZE,
- DEFAULT_CONSUMER_WINDOW_SIZE,
- DEFAULT_CONSUMER_MAX_RATE,
- DEFAULT_CONFIRMATION_WINDOW_SIZE,
- DEFAULT_PRODUCER_WINDOW_SIZE,
- DEFAULT_PRODUCER_MAX_RATE,
- DEFAULT_BLOCK_ON_ACKNOWLEDGE,
- DEFAULT_BLOCK_ON_PERSISTENT_SEND,
- DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
- DEFAULT_AUTO_GROUP,
- DEFAULT_PRE_ACKNOWLEDGE,
-
DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
- DEFAULT_ACK_BATCH_SIZE,
- DEFAULT_ACK_BATCH_SIZE,
- DEFAULT_USE_GLOBAL_POOLS,
- DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
- DEFAULT_THREAD_POOL_MAX_SIZE,
- retryInterval,
- retryIntervalMultiplier,
- DEFAULT_MAX_RETRY_INTERVAL,
- reconnectAttempts,
- failoverOnServerShutdown,
- jndiBindings);
- }
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
- queue = createQueue("queue1");
- }
-
- @Override
- protected void tearDown() throws Exception
- {
- queue = null;
- super.tearDown();
- }
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Modified:
branches/20-optimisation/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -25,7 +25,7 @@
import javax.transaction.xa.Xid;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.client.ClientConsumer;
import org.hornetq.core.client.ClientMessage;
import org.hornetq.core.client.ClientProducer;
@@ -590,7 +590,7 @@
protected HornetQBuffer createLargeBuffer(final int numberOfIntegers)
{
- HornetQBuffer body = HornetQChannelBuffers.fixedBuffer(DataConstants.SIZE_INT *
numberOfIntegers);
+ HornetQBuffer body = HornetQBuffers.fixedBuffer(DataConstants.SIZE_INT *
numberOfIntegers);
for (int i = 0; i < numberOfIntegers; i++)
{
Modified:
branches/20-optimisation/tests/src/org/hornetq/tests/integration/management/ManagementHelperTest.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/integration/management/ManagementHelperTest.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/integration/management/ManagementHelperTest.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -24,7 +24,7 @@
import junit.framework.TestCase;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.client.ClientMessage;
import org.hornetq.core.client.impl.ClientMessageImpl;
import org.hornetq.core.client.impl.ClientSessionImpl;
Modified:
branches/20-optimisation/tests/src/org/hornetq/tests/integration/management/ManagementServiceImplTest.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/integration/management/ManagementServiceImplTest.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/integration/management/ManagementServiceImplTest.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -17,7 +17,7 @@
import static org.hornetq.tests.util.RandomUtil.randomString;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.client.management.impl.ManagementHelper;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
Modified:
branches/20-optimisation/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -28,7 +28,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.client.ClientSessionFactory;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.client.impl.FailoverManager;
@@ -326,7 +326,7 @@
serverMsg.setMessageID(500);
serverMsg.setDestination(new SimpleString("tttt"));
- HornetQBuffer buffer = HornetQChannelBuffers.dynamicBuffer(100);
+ HornetQBuffer buffer = HornetQBuffers.dynamicBuffer(100);
serverMsg.encodeHeadersAndProperties(buffer);
manager.largeMessageBegin(500);
Modified:
branches/20-optimisation/tests/src/org/hornetq/tests/performance/persistence/StorageManagerTimingTest.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/performance/persistence/StorageManagerTimingTest.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/performance/persistence/StorageManagerTimingTest.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -19,7 +19,7 @@
import java.util.concurrent.atomic.AtomicLong;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.config.impl.FileConfiguration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
Modified:
branches/20-optimisation/tests/src/org/hornetq/tests/stress/journal/CompactingStressTest.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/stress/journal/CompactingStressTest.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/stress/journal/CompactingStressTest.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -17,7 +17,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.client.ClientConsumer;
import org.hornetq.core.client.ClientMessage;
import org.hornetq.core.client.ClientProducer;
Modified:
branches/20-optimisation/tests/src/org/hornetq/tests/stress/journal/LargeJournalStressTest.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/stress/journal/LargeJournalStressTest.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/stress/journal/LargeJournalStressTest.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -16,7 +16,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.client.ClientConsumer;
import org.hornetq.core.client.ClientMessage;
import org.hornetq.core.client.ClientProducer;
Modified:
branches/20-optimisation/tests/src/org/hornetq/tests/stress/journal/MultiThreadConsumerStressTest.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/stress/journal/MultiThreadConsumerStressTest.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/stress/journal/MultiThreadConsumerStressTest.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -16,7 +16,7 @@
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.client.*;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
Modified:
branches/20-optimisation/tests/src/org/hornetq/tests/stress/journal/NIOMultiThreadCompactorStressTest.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/stress/journal/NIOMultiThreadCompactorStressTest.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/stress/journal/NIOMultiThreadCompactorStressTest.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -22,7 +22,7 @@
import javax.transaction.xa.Xid;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.client.ClientConsumer;
import org.hornetq.core.client.ClientMessage;
import org.hornetq.core.client.ClientProducer;
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/timing/util/UTF8Test.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/timing/util/UTF8Test.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/timing/util/UTF8Test.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -14,7 +14,7 @@
package org.hornetq.tests.timing.util;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.tests.util.UnitTestCase;
import org.hornetq.utils.UTF8Util;
@@ -47,7 +47,7 @@
public void testWriteUTF() throws Exception
{
- HornetQBuffer buffer = HornetQChannelBuffers.fixedBuffer(10 * 1024);
+ HornetQBuffer buffer = HornetQBuffers.fixedBuffer(10 * 1024);
long start = System.currentTimeMillis();
@@ -72,7 +72,7 @@
public void testReadUTF() throws Exception
{
- HornetQBuffer buffer = HornetQChannelBuffers.fixedBuffer(10 * 1024);
+ HornetQBuffer buffer = HornetQBuffers.fixedBuffer(10 * 1024);
buffer.writeUTF(str);
Modified:
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -26,7 +26,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.client.ClientMessage;
import org.hornetq.core.client.MessageHandler;
import org.hornetq.core.client.impl.ClientConsumerInternal;
@@ -114,7 +114,7 @@
{
LargeMessageBufferImpl buffer = create15BytesSample();
- HornetQBuffer dstBuffer = HornetQChannelBuffers.fixedBuffer(20);
+ HornetQBuffer dstBuffer = HornetQBuffers.fixedBuffer(20);
dstBuffer.setIndex(0, 5);
@@ -168,7 +168,7 @@
public void testReadData() throws Exception
{
- HornetQBuffer dynamic = HornetQChannelBuffers.dynamicBuffer(1);
+ HornetQBuffer dynamic = HornetQBuffers.dynamicBuffer(1);
String str1 = RandomUtil.randomString();
String str2 = RandomUtil.randomString();
@@ -197,7 +197,7 @@
{
clearData();
- HornetQBuffer dynamic = HornetQChannelBuffers.dynamicBuffer(1);
+ HornetQBuffer dynamic = HornetQBuffers.dynamicBuffer(1);
String str1 = RandomUtil.randomString();
String str2 = RandomUtil.randomString();
Modified:
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/filter/impl/FilterTest.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/filter/impl/FilterTest.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/filter/impl/FilterTest.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -13,10 +13,10 @@
package org.hornetq.tests.unit.core.filter.impl;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.filter.impl.FilterImpl;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.tests.util.RandomUtil;
@@ -32,6 +32,8 @@
*/
public class FilterTest extends UnitTestCase
{
+ private static final Logger log = Logger.getLogger(FilterTest.class);
+
private Filter filter;
private ServerMessage message;
@@ -40,7 +42,7 @@
{
super.setUp();
- message = new ServerMessageImpl();
+ message = new ServerMessageImpl(1, 1000);
}
public void testFilterForgets() throws Exception
@@ -103,16 +105,19 @@
{
message.setDestination(RandomUtil.randomSimpleString());
- assertTrue(message.getEncodeSize() < 1024);
-
- Filter moreThan128 = FilterImpl.createFilter(new SimpleString("HQSize >
128"));
- Filter lessThan1024 = FilterImpl.createFilter(new SimpleString("HQSize <
1024"));
+ int encodeSize = message.getEncodeSize();
+
+ Filter moreThanSmall = FilterImpl.createFilter(new SimpleString("HQSize >
" + (encodeSize - 1)));
+ Filter lessThanLarge = FilterImpl.createFilter(new SimpleString("HQSize <
" + (encodeSize + 1)));
- assertFalse(moreThan128.match(message));
- assertTrue(lessThan1024.match(message));
+ Filter lessThanSmall = FilterImpl.createFilter(new SimpleString("HQSize <
" + encodeSize));
+ Filter moreThanLarge = FilterImpl.createFilter(new SimpleString("HQSize >
" + encodeSize));
+
+ assertTrue(moreThanSmall.match(message));
+ assertTrue(lessThanLarge.match(message));
- assertTrue(moreThan128.match(message));
- assertFalse(lessThan1024.match(message));
+ assertFalse(lessThanSmall.match(message));
+ assertFalse(moreThanLarge.match(message));
}
Modified:
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -18,7 +18,7 @@
import java.util.List;
import java.util.concurrent.Executors;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.paging.Page;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.impl.PagedMessageImpl;
@@ -112,11 +112,12 @@
protected ServerMessage createMessage(final long messageId, final SimpleString
destination, final ByteBuffer buffer)
{
- ServerMessage msg = new ServerMessageImpl();
+ ServerMessage msg = new ServerMessageImpl(messageId, 1000);
- msg.setMessageID(messageId);
-
msg.setDestination(destination);
+
+ msg.getBodyBuffer().writeBytes(buffer);
+
return msg;
}
Modified:
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -29,7 +29,7 @@
import javax.transaction.xa.Xid;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
@@ -107,7 +107,7 @@
assertEquals(nr1, trans.getNumberOfMessages());
- HornetQBuffer buffer = HornetQChannelBuffers.fixedBuffer(trans.getEncodeSize());
+ HornetQBuffer buffer = HornetQBuffers.fixedBuffer(trans.getEncodeSize());
trans.encode(buffer);
@@ -205,7 +205,7 @@
buffers.add(buffer);
SimpleString destination = new SimpleString("test");
- ServerMessage msg = createMessage(storeImpl, destination, buffer);
+ ServerMessage msg = createMessage(1, storeImpl, destination, buffer);
assertTrue(storeImpl.isPaging());
@@ -268,7 +268,7 @@
buffers.add(buffer);
- ServerMessage msg = createMessage(storeImpl, destination, buffer);
+ ServerMessage msg = createMessage(i, storeImpl, destination, buffer);
assertTrue(storeImpl.page(msg, true));
}
@@ -342,7 +342,7 @@
storeImpl.forceAnotherPage();
}
- ServerMessage msg = createMessage(storeImpl, destination, buffer);
+ ServerMessage msg = createMessage(i, storeImpl, destination, buffer);
assertTrue(storeImpl.page(msg, true));
}
@@ -374,7 +374,7 @@
assertTrue(storeImpl.isPaging());
- ServerMessage msg = createMessage(storeImpl, destination, buffers.get(0));
+ ServerMessage msg = createMessage(1, storeImpl, destination, buffers.get(0));
assertTrue(storeImpl.page(msg, true));
@@ -493,7 +493,7 @@
// Each thread will Keep paging until all the messages are depaged.
// This is possible because the depage thread is not actually reading
the pages.
// Just using the internal API to remove it from the page file system
- ServerMessage msg = createMessage(storeImpl, destination,
createRandomBuffer(id, 5));
+ ServerMessage msg = createMessage(id, storeImpl, destination,
createRandomBuffer(id, 5));
if (storeImpl.page(msg, false))
{
buffers.put(id, msg);
@@ -637,7 +637,7 @@
assertEquals(numberOfPages, storeImpl2.getNumberOfPages());
long lastMessageId = messageIdGenerator.incrementAndGet();
- ServerMessage lastMsg = createMessage(storeImpl, destination,
createRandomBuffer(lastMessageId, 5));
+ ServerMessage lastMsg = createMessage(lastMessageId, storeImpl, destination,
createRandomBuffer(lastMessageId, 5));
storeImpl2.page(lastMsg, true);
buffers2.put(lastMessageId, lastMsg);
@@ -702,22 +702,24 @@
return new FakePostOffice();
}
- private ServerMessage createMessage(final PagingStore store,
+ private ServerMessage createMessage(final long id, final PagingStore store,
final SimpleString destination,
final HornetQBuffer buffer)
{
- ServerMessage msg = new ServerMessageImpl();
+ ServerMessage msg = new ServerMessageImpl(id, 1000);
msg.setDestination(destination);
msg.setPagingStore(store);
+
+ msg.getBodyBuffer().writeBytes(buffer, buffer.capacity());
return msg;
}
private HornetQBuffer createRandomBuffer(final long id, final int size)
{
- HornetQBuffer buffer = HornetQChannelBuffers.fixedBuffer(size + 8);
+ HornetQBuffer buffer = HornetQBuffers.fixedBuffer(size + 8);
buffer.writeLong(id);
Modified:
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -17,7 +17,7 @@
import java.util.ArrayList;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.PreparedTransactionInfo;
import org.hornetq.core.journal.RecordInfo;
@@ -145,7 +145,7 @@
{
if (record.userRecordType == JournalStorageManager.ID_COUNTER_RECORD)
{
- HornetQBuffer buffer = HornetQChannelBuffers.wrappedBuffer(record.data);
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(record.data);
batch.loadState(record.id, buffer);
}
}
Modified:
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/ChannelBufferWrapper2Test.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/ChannelBufferWrapper2Test.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/ChannelBufferWrapper2Test.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -14,7 +14,7 @@
package org.hornetq.tests.unit.core.remoting.impl.netty;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.tests.unit.core.remoting.HornetQBufferTestBase;
/**
@@ -43,7 +43,7 @@
@Override
protected HornetQBuffer createBuffer()
{
- return HornetQChannelBuffers.dynamicBuffer(512);
+ return HornetQBuffers.dynamicBuffer(512);
}
// Package protected ---------------------------------------------
Modified:
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -20,7 +20,7 @@
import java.util.List;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.remoting.spi.Connection;
import org.hornetq.core.remoting.spi.ConnectionLifeCycleListener;
@@ -52,7 +52,7 @@
public void testWrite() throws Exception
{
- HornetQBuffer buff =
HornetQChannelBuffers.wrappedBuffer(ByteBuffer.allocate(128));
+ HornetQBuffer buff = HornetQBuffers.wrappedBuffer(ByteBuffer.allocate(128));
SimpleChannel channel = new SimpleChannel(randomInt());
assertEquals(0, channel.getWritten().size());
Modified:
branches/20-optimisation/tests/src/org/hornetq/tests/unit/util/TypedPropertiesTest.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/unit/util/TypedPropertiesTest.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/unit/util/TypedPropertiesTest.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -27,7 +27,7 @@
import java.util.Iterator;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.tests.util.UnitTestCase;
import org.hornetq.utils.SimpleString;
import org.hornetq.utils.TypedProperties;
@@ -210,7 +210,7 @@
SimpleString keyToRemove = randomSimpleString();
props.putSimpleStringProperty(keyToRemove, randomSimpleString());
- HornetQBuffer buffer = HornetQChannelBuffers.dynamicBuffer(1024);
+ HornetQBuffer buffer = HornetQBuffers.dynamicBuffer(1024);
props.encode(buffer);
assertEquals(props.getEncodeSize(), buffer.writerIndex());
@@ -234,7 +234,7 @@
{
TypedProperties emptyProps = new TypedProperties();
- HornetQBuffer buffer = HornetQChannelBuffers.dynamicBuffer(1024);
+ HornetQBuffer buffer = HornetQBuffers.dynamicBuffer(1024);
emptyProps.encode(buffer);
assertEquals(props.getEncodeSize(), buffer.writerIndex());
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/unit/util/UTF8Test.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/unit/util/UTF8Test.java 2009-11-25
03:26:45 UTC (rev 8400)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/unit/util/UTF8Test.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -20,7 +20,7 @@
import java.nio.ByteBuffer;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.UnitTestCase;
import org.hornetq.utils.DataConstants;
@@ -41,7 +41,7 @@
public void testValidateUTF() throws Exception
{
- HornetQBuffer buffer = HornetQChannelBuffers.fixedBuffer(60 * 1024);
+ HornetQBuffer buffer = HornetQBuffers.fixedBuffer(60 * 1024);
byte[] bytes = new byte[20000];
@@ -71,11 +71,11 @@
String str = new String(bytes);
// The maximum size the encoded UTF string would reach is str.length * 3 (look
at the UTF8 implementation)
- testValidateUTFOnDataInputStream(str,
HornetQChannelBuffers.wrappedBuffer(ByteBuffer.allocate(str.length() * 3 +
DataConstants.SIZE_SHORT)));
+ testValidateUTFOnDataInputStream(str,
HornetQBuffers.wrappedBuffer(ByteBuffer.allocate(str.length() * 3 +
DataConstants.SIZE_SHORT)));
- testValidateUTFOnDataInputStream(str,
HornetQChannelBuffers.dynamicBuffer(100));
+ testValidateUTFOnDataInputStream(str, HornetQBuffers.dynamicBuffer(100));
- testValidateUTFOnDataInputStream(str, HornetQChannelBuffers.fixedBuffer(100 *
1024));
+ testValidateUTFOnDataInputStream(str, HornetQBuffers.fixedBuffer(100 * 1024));
}
}
@@ -94,7 +94,7 @@
outData.writeUTF(str);
- HornetQBuffer buffer = HornetQChannelBuffers.wrappedBuffer(byteOut.toByteArray());
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(byteOut.toByteArray());
newStr = UTF8Util.readUTF(buffer);
@@ -113,7 +113,7 @@
String str = new String(chars);
- HornetQBuffer buffer = HornetQChannelBuffers.fixedBuffer(0xffff + 4);
+ HornetQBuffer buffer = HornetQBuffers.fixedBuffer(0xffff + 4);
try
{
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/util/UnitTestCase.java 2009-11-25
03:26:45 UTC (rev 8400)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/util/UnitTestCase.java 2009-11-25
11:07:36 UTC (rev 8401)
@@ -44,7 +44,7 @@
import junit.framework.TestSuite;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
-import org.hornetq.core.buffers.HornetQChannelBuffers;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.client.ClientMessage;
import org.hornetq.core.client.ClientSession;
import org.hornetq.core.exception.HornetQException;