[jboss-cvs] JBoss Messaging SVN: r5933 - in branches/JBMESSAGING_1394: src/main/org/jboss/messaging/core/client/impl and 19 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Feb 25 21:04:44 EST 2009
Author: clebert.suconic at jboss.com
Date: 2009-02-25 21:04:43 -0500 (Wed, 25 Feb 2009)
New Revision: 5933
Added:
branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/buffers/ByteBufferBackedChannelBuffer.java
branches/JBMESSAGING_1394/src/main/org/jboss/messaging/integration/transports/netty/ChannelBufferExtends.java
Removed:
branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/impl/ByteBufferWrapper.java
branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/impl/ExpandingMessagingBuffer.java
Modified:
branches/JBMESSAGING_1394/dotnet/JBMClient/remoting/impl/invm/InVMConnection.java
branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java
branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/paging/impl/PagedMessageImpl.java
branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/buffers/AbstractChannelBuffer.java
branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/buffers/ChannelBuffer.java
branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/buffers/ChannelBuffers.java
branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/buffers/DynamicChannelBuffer.java
branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/buffers/HeapChannelBuffer.java
branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/impl/AbstractBufferHandler.java
branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnection.java
branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveMessage.java
branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java
branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
branches/JBMESSAGING_1394/src/main/org/jboss/messaging/integration/transports/netty/ChannelBufferWrapper.java
branches/JBMESSAGING_1394/src/main/org/jboss/messaging/integration/transports/netty/MessagingChannelHandler.java
branches/JBMESSAGING_1394/src/main/org/jboss/messaging/integration/transports/netty/MessagingFrameDecoder.java
branches/JBMESSAGING_1394/src/main/org/jboss/messaging/integration/transports/netty/NettyConnection.java
branches/JBMESSAGING_1394/src/main/org/jboss/messaging/jms/client/JBossBytesMessage.java
branches/JBMESSAGING_1394/src/main/org/jboss/messaging/jms/client/JBossMessage.java
branches/JBMESSAGING_1394/src/main/org/jboss/messaging/jms/client/JBossStreamMessage.java
branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/basic/AutoGroupClientTest.java
branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/basic/CoreClientTest.java
branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/basic/SelfExpandingBufferTest.java
branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java
branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/bridge/BridgeReconnectTest.java
branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/bridge/BridgeTest.java
branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/bridge/SimpleTransformer.java
branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ActivationTimeoutTest.java
branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/failover/AutomaticFailoverWithDiscoveryTest.java
branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailBackupServerTest.java
branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverExpiredMessageTest.java
branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverManagementTest.java
branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverNoSessionsFailoverTest.java
branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverPreAcknowledgeTest.java
branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverScheduledMessageTest.java
branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailureOnCreateConnectionTest.java
branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/failover/JustReplicationTest.java
branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java
Log:
Back up
Modified: branches/JBMESSAGING_1394/dotnet/JBMClient/remoting/impl/invm/InVMConnection.java
===================================================================
--- branches/JBMESSAGING_1394/dotnet/JBMClient/remoting/impl/invm/InVMConnection.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/dotnet/JBMClient/remoting/impl/invm/InVMConnection.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -26,7 +26,6 @@
import java.util.concurrent.Executors;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
import org.jboss.messaging.core.remoting.spi.BufferHandler;
import org.jboss.messaging.core.remoting.spi.Connection;
import org.jboss.messaging.core.remoting.spi.ConnectionLifeCycleListener;
Modified: branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -26,7 +26,7 @@
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.Channel;
-import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.buffers.ByteBufferBackedChannelBuffer;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
@@ -372,9 +372,9 @@
{
MessagingBuffer currentBody = currentChunkMessage.getBody();
- final int currentBodySize = currentBody == null ? 0 : currentBody.limit();
+ final int currentBodySize = currentBody == null ? 0 : currentBody.capacity();
- MessagingBuffer newBody = new ByteBufferWrapper(ByteBuffer.allocate(currentBodySize + body.limit()));
+ MessagingBuffer newBody = new ByteBufferBackedChannelBuffer(ByteBuffer.allocate(currentBodySize + body.limit()));
if (currentBody != null)
{
@@ -671,7 +671,7 @@
{
int propertiesSize = message.getPropertiesEncodeSize();
- MessagingBuffer bufferProperties = message.getBody().createNewBuffer(propertiesSize);
+ MessagingBuffer bufferProperties = session.createBuffer(propertiesSize);
// FIXME: Find a better way to clone this ClientMessageImpl as ClientFileMessageImpl without using the
// MessagingBuffer.
@@ -679,7 +679,7 @@
// abstraction
message.encodeProperties(bufferProperties);
- bufferProperties.rewind();
+ bufferProperties.resetReaderIndex();
ClientFileMessageImpl cloneMessage = new ClientFileMessageImpl();
@@ -708,7 +708,7 @@
private ClientMessageInternal createFileMessage(final byte[] header) throws Exception
{
- MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.wrap(header));
+ MessagingBuffer headerBuffer = new ByteBufferBackedChannelBuffer(ByteBuffer.wrap(header));
if (isFileConsumer())
{
Modified: branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -31,7 +31,7 @@
import org.jboss.messaging.core.message.Message;
import org.jboss.messaging.core.message.impl.MessageImpl;
import org.jboss.messaging.core.remoting.Channel;
-import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.buffers.HeapChannelBuffer;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendContinuationMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
@@ -250,7 +250,7 @@
"Header size (" + headerSize + ") is too big, use the messageBody for large data, or increase minLargeMessageSize");
}
- MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.allocate(headerSize));
+ MessagingBuffer headerBuffer = new HeapChannelBuffer(headerSize);
msg.encodeProperties(headerBuffer);
final int bodySize = msg.getBodySize();
@@ -265,7 +265,7 @@
final int chunkLength = Math.min(bodySize - pos, minLargeMessageSize);
- final MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate(chunkLength));
+ final MessagingBuffer bodyBuffer = new HeapChannelBuffer(chunkLength);
msg.encodeBody(bodyBuffer, pos, chunkLength);
Modified: branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -48,7 +48,7 @@
import org.jboss.messaging.core.remoting.FailureListener;
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.impl.ExpandingMessagingBuffer;
+import org.jboss.messaging.core.remoting.buffers.DynamicChannelBuffer;
import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
@@ -577,7 +577,7 @@
final long timestamp,
final byte priority)
{
- MessagingBuffer body = remotingConnection.createBuffer(INITIAL_MESSAGE_BODY_SIZE);
+ MessagingBuffer body = createBuffer(INITIAL_MESSAGE_BODY_SIZE);
return new ClientMessageImpl(type, durable, expiration, timestamp, priority, body);
}
@@ -591,11 +591,20 @@
public ClientMessage createClientMessage(final boolean durable)
{
- MessagingBuffer body = new ExpandingMessagingBuffer(INITIAL_MESSAGE_BODY_SIZE);
+ MessagingBuffer body = createBuffer(INITIAL_MESSAGE_BODY_SIZE);
return new ClientMessageImpl(durable, body);
}
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.client.impl.ClientSessionInternal#createBuffer(int)
+ */
+ public MessagingBuffer createBuffer(int size)
+ {
+ return new DynamicChannelBuffer(size);
+ }
+
public ClientFileMessage createFileMessage(final boolean durable)
{
return new ClientFileMessageImpl(durable);
Modified: branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
===================================================================
--- branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -17,6 +17,7 @@
import org.jboss.messaging.core.remoting.RemotingConnection;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
/**
* A ClientSessionInternal
@@ -28,6 +29,8 @@
String getName();
void acknowledge(long consumerID, long messageID) throws MessagingException;
+
+ MessagingBuffer createBuffer(int size);
void expire(long consumerID, long messageID) throws MessagingException;
Modified: branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -60,7 +60,7 @@
import org.jboss.messaging.core.journal.SequentialFileFactory;
import org.jboss.messaging.core.journal.TestableJournal;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.buffers.ByteBufferBackedChannelBuffer;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.util.Pair;
import org.jboss.messaging.util.VariableLatch;
@@ -305,7 +305,7 @@
int size = SIZE_ADD_RECORD + recordLength;
- ByteBufferWrapper bb = new ByteBufferWrapper(newBuffer(size));
+ ByteBufferBackedChannelBuffer bb = new ByteBufferBackedChannelBuffer(newBuffer(size));
bb.writeByte(ADD_RECORD);
bb.writeInt(-1); // skip ID part
@@ -317,7 +317,7 @@
try
{
- JournalFile usedFile = appendRecord(bb.getBuffer(), sync, null);
+ JournalFile usedFile = appendRecord(bb.toByteBuffer(), sync, null);
posFilesMap.put(id, new PosFiles(usedFile));
}
@@ -355,7 +355,7 @@
int size = SIZE_UPDATE_RECORD + record.getEncodeSize();
- ByteBufferWrapper bb = new ByteBufferWrapper(newBuffer(size));
+ ByteBufferBackedChannelBuffer bb = new ByteBufferBackedChannelBuffer(newBuffer(size));
bb.writeByte(UPDATE_RECORD);
bb.writeInt(-1); // skip ID part
@@ -367,7 +367,7 @@
try
{
- JournalFile usedFile = appendRecord(bb.getBuffer(), syncNonTransactional, null);
+ JournalFile usedFile = appendRecord(bb.toByteBuffer(), syncNonTransactional, null);
posFiles.addUpdateFile(usedFile);
}
@@ -446,7 +446,7 @@
int size = SIZE_ADD_RECORD_TX + recordLength;
- ByteBufferWrapper bb = new ByteBufferWrapper(newBuffer(size));
+ ByteBufferBackedChannelBuffer bb = new ByteBufferBackedChannelBuffer(newBuffer(size));
bb.writeByte(ADD_RECORD_TX);
bb.writeInt(-1); // skip ID part
@@ -459,7 +459,7 @@
try
{
- JournalFile usedFile = appendRecord(bb.getBuffer(), false, getTransactionCallback(txID));
+ JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID));
JournalTransaction tx = getTransactionInfo(txID);
@@ -498,7 +498,7 @@
int size = SIZE_UPDATE_RECORD_TX + record.getEncodeSize();
- ByteBufferWrapper bb = new ByteBufferWrapper(newBuffer(size));
+ ByteBufferBackedChannelBuffer bb = new ByteBufferBackedChannelBuffer(newBuffer(size));
bb.writeByte(UPDATE_RECORD_TX);
bb.writeInt(-1); // skip ID part
@@ -511,7 +511,7 @@
try
{
- JournalFile usedFile = appendRecord(bb.getBuffer(), false, getTransactionCallback(txID));
+ JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID));
JournalTransaction tx = getTransactionInfo(txID);
@@ -544,7 +544,7 @@
int size = SIZE_DELETE_RECORD_TX + (record != null ? record.getEncodeSize() : 0);
- ByteBufferWrapper bb = new ByteBufferWrapper(newBuffer(size));
+ ByteBufferBackedChannelBuffer bb = new ByteBufferBackedChannelBuffer(newBuffer(size));
bb.writeByte(DELETE_RECORD_TX);
bb.writeInt(-1); // skip ID part
@@ -559,7 +559,7 @@
try
{
- JournalFile usedFile = appendRecord(bb.getBuffer(), false, getTransactionCallback(txID));
+ JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID));
JournalTransaction tx = getTransactionInfo(txID);
@@ -587,7 +587,7 @@
int size = SIZE_DELETE_RECORD_TX;
- ByteBufferWrapper bb = new ByteBufferWrapper(newBuffer(size));
+ ByteBufferBackedChannelBuffer bb = new ByteBufferBackedChannelBuffer(newBuffer(size));
bb.writeByte(DELETE_RECORD_TX);
bb.writeInt(-1); // skip ID part
@@ -598,7 +598,7 @@
try
{
- JournalFile usedFile = appendRecord(bb.getBuffer(), false, getTransactionCallback(txID));
+ JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID));
JournalTransaction tx = getTransactionInfo(txID);
@@ -1858,7 +1858,7 @@
if (transactionData != null)
{
- transactionData.encode(new ByteBufferWrapper(bb));
+ transactionData.encode(new ByteBufferBackedChannelBuffer(bb));
}
for (Map.Entry<Integer, AtomicInteger> entry : tx.getElementsSummary().entrySet())
Modified: branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
===================================================================
--- branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -70,7 +70,7 @@
import org.jboss.messaging.core.messagecounter.impl.MessageCounterManagerImpl;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.buffers.ChannelBuffers;
import org.jboss.messaging.core.remoting.server.RemotingService;
import org.jboss.messaging.core.remoting.spi.Acceptor;
import org.jboss.messaging.core.security.Role;
@@ -557,7 +557,7 @@
// Now send message
ServerMessage notificationMessage = new ServerMessageImpl(storageManager.generateUniqueID());
- notificationMessage.setBody(new ByteBufferWrapper(ByteBuffer.allocate(0)));
+ notificationMessage.setBody(ChannelBuffers.EMPTY_BUFFER);
// Notification messages are always durable so the user can choose whether to add a durable queue to consume
// them in
notificationMessage.setDurable(true);
Modified: branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -32,6 +32,7 @@
import org.jboss.messaging.core.client.SendAcknowledgementHandler;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.message.Message;
+import org.jboss.messaging.core.remoting.buffers.DynamicChannelBuffer;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.util.SimpleString;
import org.jboss.messaging.util.TypedProperties;
@@ -179,7 +180,7 @@
public int getBodySize()
{
- return /* BodySize and Body */body.limit();
+ return body.writerIndex();
}
public void encodeProperties(MessagingBuffer buffer)
@@ -197,7 +198,7 @@
public void encodeBody(MessagingBuffer buffer)
{
MessagingBuffer localBody = getBody();
- buffer.writeBytes(localBody.array(), 0, localBody.limit());
+ buffer.writeBytes(localBody.array(), 0, localBody.writerIndex());
}
// Used on Message chunk
@@ -228,11 +229,11 @@
public void decodeBody(final MessagingBuffer buffer)
{
int len = buffer.readInt();
- // TODO - this can be optimised
byte[] bytes = new byte[len];
buffer.readBytes(bytes);
- body = buffer.createNewBuffer(len);
- body.writeBytes(bytes);
+
+ // Reuse the same body on the initial body created
+ body = new DynamicChannelBuffer(bytes);
}
public long getMessageID()
Modified: branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java
===================================================================
--- branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -35,7 +35,7 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.paging.Page;
import org.jboss.messaging.core.paging.PagedMessage;
-import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.buffers.ByteBufferBackedChannelBuffer;
/**
*
@@ -96,7 +96,7 @@
file.position(0);
file.read(buffer);
- ByteBufferWrapper messageBuffer = new ByteBufferWrapper(buffer);
+ ByteBufferBackedChannelBuffer messageBuffer = new ByteBufferBackedChannelBuffer(buffer);
while (buffer.hasRemaining())
{
@@ -146,7 +146,7 @@
ByteBuffer buffer = fileFactory.newBuffer(message.getEncodeSize() + SIZE_RECORD);
buffer.put(START_BYTE);
buffer.putInt(message.getEncodeSize());
- message.encode(new ByteBufferWrapper(buffer));
+ message.encode(new ByteBufferBackedChannelBuffer(buffer));
buffer.put(END_BYTE);
buffer.rewind();
Modified: branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/paging/impl/PagedMessageImpl.java
===================================================================
--- branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/paging/impl/PagedMessageImpl.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/paging/impl/PagedMessageImpl.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -26,11 +26,9 @@
import static org.jboss.messaging.util.DataConstants.SIZE_INT;
import static org.jboss.messaging.util.DataConstants.SIZE_LONG;
-import java.nio.ByteBuffer;
-
import org.jboss.messaging.core.paging.PagedMessage;
import org.jboss.messaging.core.persistence.StorageManager;
-import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.buffers.DynamicChannelBuffer;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.core.server.LargeServerMessage;
import org.jboss.messaging.core.server.ServerMessage;
@@ -84,7 +82,7 @@
if (largeMessageLazyData != null)
{
message = storage.createLargeMessage();
- MessagingBuffer buffer = new ByteBufferWrapper(ByteBuffer.wrap(largeMessageLazyData));
+ MessagingBuffer buffer = new DynamicChannelBuffer(largeMessageLazyData);
message.decode(buffer);
largeMessageLazyData = null;
}
Modified: branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -62,7 +62,8 @@
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.buffers.ByteBufferBackedChannelBuffer;
+import org.jboss.messaging.core.remoting.buffers.HeapChannelBuffer;
import org.jboss.messaging.core.remoting.impl.wireformat.XidCodecSupport;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.core.server.JournalType;
@@ -470,7 +471,7 @@
ByteBuffer bb = ByteBuffer.wrap(data);
- MessagingBuffer buff = new ByteBufferWrapper(bb);
+ MessagingBuffer buff = new ByteBufferBackedChannelBuffer(bb);
byte recordType = record.getUserRecordType();
@@ -712,7 +713,7 @@
ByteBuffer bb = ByteBuffer.wrap(data);
- MessagingBuffer buff = new ByteBufferWrapper(bb);
+ MessagingBuffer buff = new ByteBufferBackedChannelBuffer(bb);
byte recordType = record.getUserRecordType();
@@ -839,7 +840,7 @@
ByteBuffer bb = ByteBuffer.wrap(data);
- MessagingBuffer buff = new ByteBufferWrapper(bb);
+ MessagingBuffer buff = new ByteBufferBackedChannelBuffer(bb);
long messageID = record.id;
@@ -946,7 +947,7 @@
{
long id = record.id;
- MessagingBuffer buffer = new ByteBufferWrapper(ByteBuffer.wrap(record.data));
+ MessagingBuffer buffer = new HeapChannelBuffer(record.data);
byte rec = record.getUserRecordType();
@@ -1148,7 +1149,7 @@
XidEncoding(final byte[] data)
{
- xid = XidCodecSupport.decodeXid(new ByteBufferWrapper(ByteBuffer.wrap(data)));
+ xid = XidCodecSupport.decodeXid(new HeapChannelBuffer(data));
}
public void decode(final MessagingBuffer buffer)
Modified: branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
===================================================================
--- branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -22,9 +22,7 @@
package org.jboss.messaging.core.persistence.impl.nullpm;
-import java.nio.ByteBuffer;
-
-import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.buffers.DynamicChannelBuffer;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.core.server.LargeServerMessage;
import org.jboss.messaging.core.server.impl.ServerMessageImpl;
@@ -72,18 +70,14 @@
if (buffer != null)
{
- ByteBuffer newBuffer = ByteBuffer.allocate(buffer.limit() + bytes.length);
- newBuffer.put(buffer.array());
- buffer = new ByteBufferWrapper(newBuffer);
- setBody(buffer);
+ // expand the buffer
+ buffer.writeBytes(bytes);
}
else
{
- buffer = new ByteBufferWrapper(ByteBuffer.allocate(bytes.length));
- setBody(buffer);
+ // Reuse the initial byte array on the buffer construction
+ buffer = new DynamicChannelBuffer(bytes);
}
-
- buffer.writeBytes(bytes);
}
/* (non-Javadoc)
Modified: branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -54,7 +54,8 @@
import org.jboss.messaging.core.postoffice.DuplicateIDCache;
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.postoffice.QueueInfo;
-import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.buffers.ChannelBuffers;
+import org.jboss.messaging.core.remoting.buffers.HeapChannelBuffer;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.QueueFactory;
import org.jboss.messaging.core.server.SendLock;
@@ -770,7 +771,7 @@
// First send a reset message
ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID());
- message.setBody(new ByteBufferWrapper(ByteBuffer.allocate(0)));
+ message.setBody(ChannelBuffers.EMPTY_BUFFER);
message.setDestination(queueName);
message.putBooleanProperty(HDR_RESET_QUEUE_DATA, true);
queue.preroute(message, null);
@@ -840,7 +841,7 @@
private ServerMessage createQueueInfoMessage(final NotificationType type, final SimpleString queueName)
{
ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID());
- message.setBody(new ByteBufferWrapper(ByteBuffer.allocate(0)));
+ message.setBody(ChannelBuffers.EMPTY_BUFFER);
message.setDestination(queueName);
Modified: branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/buffers/AbstractChannelBuffer.java
===================================================================
--- branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/buffers/AbstractChannelBuffer.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/buffers/AbstractChannelBuffer.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -22,6 +22,7 @@
package org.jboss.messaging.core.remoting.buffers;
+
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -30,7 +31,9 @@
import java.nio.channels.ScatteringByteChannel;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.util.DataConstants;
import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.util.UTF8Util;
/**
* A skeletal implementation of a buffer.
@@ -599,16 +602,13 @@
{
return this;
}
-
- public abstract byte[] array();
/* (non-Javadoc)
* @see org.jboss.messaging.core.remoting.spi.MessagingBuffer#readBoolean()
*/
public boolean readBoolean()
{
- // TODO Auto-generated method stub
- return false;
+ return readByte() != 0;
}
/* (non-Javadoc)
@@ -616,8 +616,7 @@
*/
public char readChar()
{
- // TODO Auto-generated method stub
- return 0;
+ return (char)readShort();
}
/* (non-Javadoc)
@@ -625,8 +624,7 @@
*/
public double readDouble()
{
- // TODO Auto-generated method stub
- return 0;
+ return Double.longBitsToDouble(readLong());
}
/* (non-Javadoc)
@@ -634,8 +632,7 @@
*/
public float readFloat()
{
- // TODO Auto-generated method stub
- return 0;
+ return Float.intBitsToFloat(readInt());
}
/* (non-Javadoc)
@@ -643,8 +640,15 @@
*/
public SimpleString readNullableSimpleString()
{
- // TODO Auto-generated method stub
- return null;
+ int b = readByte();
+ if (b == DataConstants.NULL)
+ {
+ return null;
+ }
+ else
+ {
+ return readSimpleString();
+ }
}
/* (non-Javadoc)
@@ -652,8 +656,15 @@
*/
public String readNullableString()
{
- // TODO Auto-generated method stub
- return null;
+ int b = readByte();
+ if (b == DataConstants.NULL)
+ {
+ return null;
+ }
+ else
+ {
+ return readString();
+ }
}
/* (non-Javadoc)
@@ -661,8 +672,10 @@
*/
public SimpleString readSimpleString()
{
- // TODO Auto-generated method stub
- return null;
+ int len = readInt();
+ byte[] data = new byte[len];
+ readBytes(data);
+ return new SimpleString(data);
}
/* (non-Javadoc)
@@ -670,8 +683,13 @@
*/
public String readString()
{
- // TODO Auto-generated method stub
- return null;
+ int len = readInt();
+ char[] chars = new char[len];
+ for (int i = 0; i < len; i++)
+ {
+ chars[i] = readChar();
+ }
+ return new String(chars);
}
/* (non-Javadoc)
@@ -679,8 +697,7 @@
*/
public String readUTF() throws Exception
{
- // TODO Auto-generated method stub
- return null;
+ return UTF8Util.readUTF(this);
}
/* (non-Javadoc)
@@ -688,8 +705,7 @@
*/
public void writeBoolean(boolean val)
{
- // TODO Auto-generated method stub
-
+ writeByte((byte) (val ? -1 : 0));
}
/* (non-Javadoc)
@@ -697,8 +713,7 @@
*/
public void writeChar(char val)
{
- // TODO Auto-generated method stub
-
+ writeShort((short)val);
}
/* (non-Javadoc)
@@ -706,7 +721,7 @@
*/
public void writeDouble(double val)
{
- // TODO Auto-generated method stub
+ writeLong(Double.doubleToLongBits(val));
}
@@ -715,7 +730,7 @@
*/
public void writeFloat(float val)
{
- // TODO Auto-generated method stub
+ writeInt(Float.floatToIntBits(val));
}
@@ -724,8 +739,15 @@
*/
public void writeNullableSimpleString(SimpleString val)
{
- // TODO Auto-generated method stub
-
+ if (val == null)
+ {
+ writeByte(DataConstants.NULL);
+ }
+ else
+ {
+ writeByte(DataConstants.NOT_NULL);
+ writeSimpleString(val);
+ }
}
/* (non-Javadoc)
@@ -733,8 +755,15 @@
*/
public void writeNullableString(String val)
{
- // TODO Auto-generated method stub
-
+ if (val == null)
+ {
+ writeByte(DataConstants.NULL);
+ }
+ else
+ {
+ writeByte(DataConstants.NOT_NULL);
+ writeString(val);
+ }
}
/* (non-Javadoc)
@@ -742,8 +771,9 @@
*/
public void writeSimpleString(SimpleString val)
{
- // TODO Auto-generated method stub
-
+ byte[] data = val.getData();
+ writeInt(data.length);
+ writeBytes(data);
}
/* (non-Javadoc)
@@ -751,8 +781,11 @@
*/
public void writeString(String val)
{
- // TODO Auto-generated method stub
-
+ writeInt(val.length());
+ for (int i = 0; i < val.length(); i++)
+ {
+ writeShort((short)val.charAt(i));
+ }
}
/* (non-Javadoc)
@@ -760,8 +793,7 @@
*/
public void writeUTF(String utf) throws Exception
{
- // TODO Auto-generated method stub
-
+ UTF8Util.saveUTF(this, utf);
}
}
Added: branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/buffers/ByteBufferBackedChannelBuffer.java
===================================================================
--- branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/buffers/ByteBufferBackedChannelBuffer.java (rev 0)
+++ branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/buffers/ByteBufferBackedChannelBuffer.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -0,0 +1,436 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.remoting.buffers;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.GatheringByteChannel;
+import java.nio.channels.ScatteringByteChannel;
+import java.nio.charset.UnsupportedCharsetException;
+
+/**
+ * A NIO {@link ByteBuffer} based buffer. It is recommended to use {@link ChannelBuffers#directBuffer(int)}
+ * and {@link ChannelBuffers#wrappedBuffer(ByteBuffer)} instead of calling the
+ * constructor explicitly.
+ *
+ * @author The Netty Project (netty-dev at lists.jboss.org)
+ * @author Trustin Lee (tlee at redhat.com)
+ *
+ * @version $Rev: 486 $, $Date: 2008-11-16 22:52:47 +0900 (Sun, 16 Nov 2008) $
+ *
+ */
+public class ByteBufferBackedChannelBuffer extends AbstractChannelBuffer
+{
+
+ private final ByteBuffer buffer;
+
+ private final int capacity;
+
+ /**
+ * Creates a new buffer which wraps the specified buffer's slice.
+ */
+ public ByteBufferBackedChannelBuffer(final ByteBuffer buffer)
+ {
+ if (buffer == null)
+ {
+ throw new NullPointerException("buffer");
+ }
+
+ this.buffer = buffer.slice();
+ capacity = buffer.remaining();
+ writerIndex(buffer.position());
+ }
+
+ private ByteBufferBackedChannelBuffer(final ByteBufferBackedChannelBuffer buffer)
+ {
+ this.buffer = buffer.buffer;
+ capacity = buffer.capacity;
+ setIndex(buffer.readerIndex(), buffer.writerIndex());
+ }
+
+ public ByteOrder order()
+ {
+ return buffer.order();
+ }
+
+ public int capacity()
+ {
+ return capacity;
+ }
+
+ public byte getByte(final int index)
+ {
+ return buffer.get(index);
+ }
+
+ public short getShort(final int index)
+ {
+ return buffer.getShort(index);
+ }
+
+ public int getUnsignedMedium(final int index)
+ {
+ return (getByte(index) & 0xff) << 16 | (getByte(index + 1) & 0xff) << 8 | (getByte(index + 2) & 0xff) << 0;
+ }
+
+ public int getInt(final int index)
+ {
+ return buffer.getInt(index);
+ }
+
+ public long getLong(final int index)
+ {
+ return buffer.getLong(index);
+ }
+
+ public void getBytes(final int index, final ChannelBuffer dst, final int dstIndex, final int length)
+ {
+ if (dst instanceof ByteBufferBackedChannelBuffer)
+ {
+ ByteBufferBackedChannelBuffer bbdst = (ByteBufferBackedChannelBuffer)dst;
+ ByteBuffer data = bbdst.buffer.duplicate();
+
+ data.limit(dstIndex + length).position(dstIndex);
+ getBytes(index, data);
+ }
+ else if (buffer.hasArray())
+ {
+ dst.setBytes(dstIndex, buffer.array(), index + buffer.arrayOffset(), length);
+ }
+ else
+ {
+ dst.setBytes(dstIndex, this, index, length);
+ }
+ }
+
+ public void getBytes(final int index, final byte[] dst, final int dstIndex, final int length)
+ {
+ ByteBuffer data = buffer.duplicate();
+ try
+ {
+ data.limit(index + length).position(index);
+ }
+ catch (IllegalArgumentException e)
+ {
+ throw new IndexOutOfBoundsException();
+ }
+ data.get(dst, dstIndex, length);
+ }
+
+ public void getBytes(final int index, final ByteBuffer dst)
+ {
+ ByteBuffer data = buffer.duplicate();
+ int bytesToCopy = Math.min(capacity() - index, dst.remaining());
+ try
+ {
+ data.limit(index + bytesToCopy).position(index);
+ }
+ catch (IllegalArgumentException e)
+ {
+ throw new IndexOutOfBoundsException();
+ }
+ dst.put(data);
+ }
+
+ public void setByte(final int index, final byte value)
+ {
+ buffer.put(index, value);
+ }
+
+ public void setShort(final int index, final short value)
+ {
+ buffer.putShort(index, value);
+ }
+
+ public void setMedium(final int index, final int value)
+ {
+ setByte(index, (byte)(value >>> 16));
+ setByte(index + 1, (byte)(value >>> 8));
+ setByte(index + 2, (byte)(value >>> 0));
+ }
+
+ public void setInt(final int index, final int value)
+ {
+ buffer.putInt(index, value);
+ }
+
+ public void setLong(final int index, final long value)
+ {
+ buffer.putLong(index, value);
+ }
+
+ public void setBytes(final int index, final ChannelBuffer src, final int srcIndex, final int length)
+ {
+ if (src instanceof ByteBufferBackedChannelBuffer)
+ {
+ ByteBufferBackedChannelBuffer bbsrc = (ByteBufferBackedChannelBuffer)src;
+ ByteBuffer data = bbsrc.buffer.duplicate();
+
+ data.limit(srcIndex + length).position(srcIndex);
+ setBytes(index, data);
+ }
+ else if (buffer.hasArray())
+ {
+ src.getBytes(srcIndex, buffer.array(), index + buffer.arrayOffset(), length);
+ }
+ else
+ {
+ src.getBytes(srcIndex, this, index, length);
+ }
+ }
+
+ public void setBytes(final int index, final byte[] src, final int srcIndex, final int length)
+ {
+ ByteBuffer data = buffer.duplicate();
+ data.limit(index + length).position(index);
+ data.put(src, srcIndex, length);
+ }
+
+ public void setBytes(final int index, final ByteBuffer src)
+ {
+ ByteBuffer data = buffer.duplicate();
+ data.limit(index + src.remaining()).position(index);
+ data.put(src);
+ }
+
+ public void getBytes(final int index, final OutputStream out, final int length) throws IOException
+ {
+ if (length == 0)
+ {
+ return;
+ }
+
+ if (!buffer.isReadOnly() && buffer.hasArray())
+ {
+ out.write(buffer.array(), index + buffer.arrayOffset(), length);
+ }
+ else
+ {
+ byte[] tmp = new byte[length];
+ ((ByteBuffer)buffer.duplicate().position(index)).get(tmp);
+ out.write(tmp);
+ }
+ }
+
+ public int getBytes(final int index, final GatheringByteChannel out, final int length) throws IOException
+ {
+ if (length == 0)
+ {
+ return 0;
+ }
+
+ return out.write((ByteBuffer)buffer.duplicate().position(index).limit(index + length));
+ }
+
+ public int setBytes(int index, final InputStream in, int length) throws IOException
+ {
+
+ int readBytes = 0;
+
+ if (!buffer.isReadOnly() && buffer.hasArray())
+ {
+ index += buffer.arrayOffset();
+ do
+ {
+ int localReadBytes = in.read(buffer.array(), index, length);
+ if (localReadBytes < 0)
+ {
+ if (readBytes == 0)
+ {
+ return -1;
+ }
+ else
+ {
+ break;
+ }
+ }
+ readBytes += localReadBytes;
+ index += localReadBytes;
+ length -= localReadBytes;
+ }
+ while (length > 0);
+ }
+ else
+ {
+ byte[] tmp = new byte[length];
+ int i = 0;
+ do
+ {
+ int localReadBytes = in.read(tmp, i, tmp.length - i);
+ if (localReadBytes < 0)
+ {
+ if (readBytes == 0)
+ {
+ return -1;
+ }
+ else
+ {
+ break;
+ }
+ }
+ readBytes += localReadBytes;
+ i += readBytes;
+ }
+ while (i < tmp.length);
+ ((ByteBuffer)buffer.duplicate().position(index)).put(tmp);
+ }
+
+ return readBytes;
+ }
+
+ public int setBytes(final int index, final ScatteringByteChannel in, final int length) throws IOException
+ {
+
+ ByteBuffer slice = (ByteBuffer)buffer.duplicate().limit(index + length).position(index);
+ int readBytes = 0;
+
+ while (readBytes < length)
+ {
+ int localReadBytes;
+ try
+ {
+ localReadBytes = in.read(slice);
+ }
+ catch (ClosedChannelException e)
+ {
+ localReadBytes = -1;
+ }
+ if (localReadBytes < 0)
+ {
+ if (readBytes == 0)
+ {
+ return -1;
+ }
+ else
+ {
+ return readBytes;
+ }
+ }
+ else if (localReadBytes == 0)
+ {
+ break;
+ }
+ readBytes += localReadBytes;
+ }
+
+ return readBytes;
+ }
+
+ public ByteBuffer toByteBuffer(final int index, final int length)
+ {
+ if (index == 0 && length == capacity())
+ {
+ return buffer.duplicate();
+ }
+ else
+ {
+ return ((ByteBuffer)buffer.duplicate().position(index).limit(index + length)).slice();
+ }
+ }
+
+ @Override
+ public ByteBuffer toByteBuffer()
+ {
+ return buffer;
+ }
+
+ public String toString(final int index, final int length, final String charsetName)
+ {
+ if (!buffer.isReadOnly() && buffer.hasArray())
+ {
+ try
+ {
+ return new String(buffer.array(), index + buffer.arrayOffset(), length, charsetName);
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ throw new UnsupportedCharsetException(charsetName);
+ }
+ }
+ else
+ {
+ byte[] tmp = new byte[length];
+ ((ByteBuffer)buffer.duplicate().position(index)).get(tmp);
+ try
+ {
+ return new String(tmp, charsetName);
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ throw new UnsupportedCharsetException(charsetName);
+ }
+ }
+ }
+
+ public ChannelBuffer slice(final int index, final int length)
+ {
+ if (index == 0 && length == capacity())
+ {
+ return duplicate();
+ }
+ else
+ {
+ if (index >= 0 && length == 0)
+ {
+ return ChannelBuffers.EMPTY_BUFFER;
+ }
+ return new ByteBufferBackedChannelBuffer(((ByteBuffer)buffer.duplicate().position(index).limit(index + length)));
+ }
+ }
+
+ public ChannelBuffer duplicate()
+ {
+ return new ByteBufferBackedChannelBuffer(this);
+ }
+
+ public ChannelBuffer copy(final int index, final int length)
+ {
+ ByteBuffer src;
+ try
+ {
+ src = (ByteBuffer)buffer.duplicate().position(index).limit(index + length);
+ }
+ catch (IllegalArgumentException e)
+ {
+ throw new IndexOutOfBoundsException();
+ }
+
+ ByteBuffer dst = buffer.isDirect() ? ByteBuffer.allocateDirect(length) : ByteBuffer.allocate(length);
+ dst.put(src);
+ dst.clear();
+ return new ByteBufferBackedChannelBuffer(dst);
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.buffers.ChannelBuffer#array()
+ */
+ public byte[] array()
+ {
+ return buffer.array();
+ }
+}
Modified: branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/buffers/ChannelBuffer.java
===================================================================
--- branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/buffers/ChannelBuffer.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/buffers/ChannelBuffer.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -240,6 +240,8 @@
* Returns the number of bytes (octets) this buffer can contain.
*/
int capacity();
+
+ byte[] array();
/**
* Returns the {@code readerIndex} of this buffer.
Modified: branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/buffers/ChannelBuffers.java
===================================================================
--- branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/buffers/ChannelBuffers.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/buffers/ChannelBuffers.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -91,7 +91,7 @@
/**
* A buffer whose capacity is {@code 0}.
*/
- public static final ChannelBuffer EMPTY_BUFFER = new HeapChannelBuffer(0);
+ public static final HeapChannelBuffer EMPTY_BUFFER = new HeapChannelBuffer(0);
private static final char[] HEXDUMP_TABLE = new char[65536 * 4];
Modified: branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/buffers/DynamicChannelBuffer.java
===================================================================
--- branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/buffers/DynamicChannelBuffer.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/buffers/DynamicChannelBuffer.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -47,7 +47,7 @@
private ChannelBuffer buffer = ChannelBuffers.EMPTY_BUFFER;
- public DynamicChannelBuffer(int estimatedLength)
+ public DynamicChannelBuffer(final int estimatedLength)
{
if (estimatedLength < 0)
{
@@ -56,185 +56,194 @@
initialCapacity = estimatedLength;
}
+ public DynamicChannelBuffer(final byte[] initialBuffer)
+ {
+ initialCapacity = initialBuffer.length;
+
+ buffer = new HeapChannelBuffer(initialBuffer);
+
+ writerIndex(initialBuffer.length);
+ }
+
public int capacity()
{
return buffer.capacity();
}
- public byte getByte(int index)
+ public byte getByte(final int index)
{
return buffer.getByte(index);
}
- public short getShort(int index)
+ public short getShort(final int index)
{
return buffer.getShort(index);
}
- public int getUnsignedMedium(int index)
+ public int getUnsignedMedium(final int index)
{
return buffer.getUnsignedMedium(index);
}
- public int getInt(int index)
+ public int getInt(final int index)
{
return buffer.getInt(index);
}
- public long getLong(int index)
+ public long getLong(final int index)
{
return buffer.getLong(index);
}
- public void getBytes(int index, byte[] dst, int dstIndex, int length)
+ public void getBytes(final int index, final byte[] dst, final int dstIndex, final int length)
{
buffer.getBytes(index, dst, dstIndex, length);
}
- public void getBytes(int index, ChannelBuffer dst, int dstIndex, int length)
+ public void getBytes(final int index, final ChannelBuffer dst, final int dstIndex, final int length)
{
buffer.getBytes(index, dst, dstIndex, length);
}
- public void getBytes(int index, ByteBuffer dst)
+ public void getBytes(final int index, final ByteBuffer dst)
{
buffer.getBytes(index, dst);
}
- public int getBytes(int index, GatheringByteChannel out, int length) throws IOException
+ public int getBytes(final int index, final GatheringByteChannel out, final int length) throws IOException
{
return buffer.getBytes(index, out, length);
}
- public void getBytes(int index, OutputStream out, int length) throws IOException
+ public void getBytes(final int index, final OutputStream out, final int length) throws IOException
{
buffer.getBytes(index, out, length);
}
- public void setByte(int index, byte value)
+ public void setByte(final int index, final byte value)
{
buffer.setByte(index, value);
}
- public void setShort(int index, short value)
+ public void setShort(final int index, final short value)
{
buffer.setShort(index, value);
}
- public void setMedium(int index, int value)
+ public void setMedium(final int index, final int value)
{
buffer.setMedium(index, value);
}
- public void setInt(int index, int value)
+ public void setInt(final int index, final int value)
{
buffer.setInt(index, value);
}
- public void setLong(int index, long value)
+ public void setLong(final int index, final long value)
{
buffer.setLong(index, value);
}
- public void setBytes(int index, byte[] src, int srcIndex, int length)
+ public void setBytes(final int index, final byte[] src, final int srcIndex, final int length)
{
buffer.setBytes(index, src, srcIndex, length);
}
- public void setBytes(int index, ChannelBuffer src, int srcIndex, int length)
+ public void setBytes(final int index, final ChannelBuffer src, final int srcIndex, final int length)
{
buffer.setBytes(index, src, srcIndex, length);
}
- public void setBytes(int index, ByteBuffer src)
+ public void setBytes(final int index, final ByteBuffer src)
{
buffer.setBytes(index, src);
}
- public int setBytes(int index, InputStream in, int length) throws IOException
+ public int setBytes(final int index, final InputStream in, final int length) throws IOException
{
return buffer.setBytes(index, in, length);
}
- public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException
+ public int setBytes(final int index, final ScatteringByteChannel in, final int length) throws IOException
{
return buffer.setBytes(index, in, length);
}
@Override
- public void writeByte(byte value)
+ public void writeByte(final byte value)
{
ensureWritableBytes(1);
super.writeByte(value);
}
@Override
- public void writeShort(short value)
+ public void writeShort(final short value)
{
ensureWritableBytes(2);
super.writeShort(value);
}
@Override
- public void writeMedium(int value)
+ public void writeMedium(final int value)
{
ensureWritableBytes(3);
super.writeMedium(value);
}
@Override
- public void writeInt(int value)
+ public void writeInt(final int value)
{
ensureWritableBytes(4);
super.writeInt(value);
}
@Override
- public void writeLong(long value)
+ public void writeLong(final long value)
{
ensureWritableBytes(8);
super.writeLong(value);
}
@Override
- public void writeBytes(byte[] src, int srcIndex, int length)
+ public void writeBytes(final byte[] src, final int srcIndex, final int length)
{
ensureWritableBytes(length);
super.writeBytes(src, srcIndex, length);
}
@Override
- public void writeBytes(ChannelBuffer src, int srcIndex, int length)
+ public void writeBytes(final ChannelBuffer src, final int srcIndex, final int length)
{
ensureWritableBytes(length);
super.writeBytes(src, srcIndex, length);
}
@Override
- public void writeBytes(ByteBuffer src)
+ public void writeBytes(final ByteBuffer src)
{
ensureWritableBytes(src.remaining());
super.writeBytes(src);
}
@Override
- public void writeZero(int length)
+ public void writeZero(final int length)
{
ensureWritableBytes(length);
super.writeZero(length);
}
- public ByteBuffer toByteBuffer(int index, int length)
+ public ByteBuffer toByteBuffer(final int index, final int length)
{
return buffer.toByteBuffer(index, length);
}
- public String toString(int index, int length, String charsetName)
+ public String toString(final int index, final int length, final String charsetName)
{
return buffer.toString(index, length, charsetName);
}
- private void ensureWritableBytes(int requestedBytes)
+ private void ensureWritableBytes(final int requestedBytes)
{
if (requestedBytes <= writableBytes())
{
@@ -264,4 +273,12 @@
newBuffer.writeBytes(buffer, readerIndex(), readableBytes());
buffer = newBuffer;
}
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.buffers.AbstractChannelBuffer#array()
+ */
+ public byte[] array()
+ {
+ return buffer.array();
+ }
}
Modified: branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/buffers/HeapChannelBuffer.java
===================================================================
--- branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/buffers/HeapChannelBuffer.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/buffers/HeapChannelBuffer.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -33,8 +33,6 @@
import java.nio.channels.ScatteringByteChannel;
import java.nio.charset.UnsupportedCharsetException;
-import org.jboss.messaging.util.SimpleString;
-
/**
* A skeletal implementation for Java heap buffers.
*
Modified: branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/impl/AbstractBufferHandler.java
===================================================================
--- branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/impl/AbstractBufferHandler.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/impl/AbstractBufferHandler.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -21,10 +21,9 @@
*/
package org.jboss.messaging.core.remoting.impl;
-import static org.jboss.messaging.util.DataConstants.SIZE_INT;
-
import org.jboss.messaging.core.remoting.spi.BufferHandler;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.util.DataConstants;
/**
* A AbstractBufferHandler
@@ -36,14 +35,14 @@
{
public int isReadyToHandle(final MessagingBuffer buffer)
{
- if (buffer.remaining() <= SIZE_INT)
+ if (buffer.readableBytes() < DataConstants.SIZE_INT)
{
return -1;
}
int length = buffer.readInt();
- if (buffer.remaining() < length)
+ if (buffer.readableBytes() < length)
{
return -1;
}
Deleted: branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/impl/ByteBufferWrapper.java
===================================================================
--- branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/impl/ByteBufferWrapper.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/impl/ByteBufferWrapper.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -1,362 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.remoting.impl;
-
-import static org.jboss.messaging.util.DataConstants.FALSE;
-import static org.jboss.messaging.util.DataConstants.NOT_NULL;
-import static org.jboss.messaging.util.DataConstants.NULL;
-import static org.jboss.messaging.util.DataConstants.TRUE;
-
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.nio.charset.Charset;
-
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.util.SimpleString;
-import org.jboss.messaging.util.UTF8Util;
-
-/**
- *
- * A ByteBufferWrapper
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
- *
- */
-public class ByteBufferWrapper implements MessagingBuffer
-{
- private static final Charset utf8 = Charset.forName("UTF-8");
-
- private ByteBuffer buffer;
-
- public ByteBufferWrapper(final ByteBuffer buffer)
- {
- this.buffer = buffer;
- }
-
- public ByteBuffer getBuffer()
- {
- return buffer;
- }
-
- public byte[] array()
- {
- if(buffer.hasArray())
- {
- return buffer.array();
- }
- else
- {
- byte[] b = new byte[buffer.limit()];
- buffer.get(b);
- return b;
- }
- }
-
- public int position()
- {
- return buffer.position();
- }
-
- public void position(final int position)
- {
- buffer.position(position);
- }
-
- public int capacity()
- {
- return buffer.capacity();
- }
-
- public void flip()
- {
- buffer.flip();
- }
-
- public MessagingBuffer slice()
- {
- return new ByteBufferWrapper(buffer.slice());
- }
-
- public MessagingBuffer createNewBuffer(int len)
- {
- return new ByteBufferWrapper(ByteBuffer.allocate(len));
- }
-
- public void rewind()
- {
- buffer.rewind();
- }
-
- public boolean readBoolean()
- {
- byte b = buffer.get();
- return (b == TRUE);
- }
-
- public byte readByte()
- {
- return buffer.get();
- }
-
- public short readUnsignedByte()
- {
- return (short)(buffer.get() & 0xFF);
- }
-
- public void readBytes(byte[] bytes)
- {
- buffer.get(bytes);
- }
-
- public void readBytes(byte[] bytes, int offset, int length)
- {
- buffer.get(bytes, offset, length);
- }
-
- public double readDouble()
- {
- return buffer.getDouble();
- }
-
- public float readFloat()
- {
- return buffer.getFloat();
- }
-
- public int readInt()
- {
- return buffer.getInt();
- }
-
- public long readLong()
- {
- return buffer.getLong();
- }
-
- public void writeNullableString(final String nullableString)
- {
- if (nullableString == null)
- {
- buffer.put(NULL);
- }
- else
- {
- buffer.put(NOT_NULL);
-
- writeString(nullableString);
- }
- }
-
- public String readNullableString()
- {
- byte check = buffer.get();
-
- if (check == NULL)
- {
- return null;
- }
- else
- {
- return readString();
- }
- }
-
- public void writeString(final String nullableString)
- {
- //We don't encode
-
- buffer.putInt(nullableString.length());
-
- for (int i = 0; i < nullableString.length(); i++)
- {
- buffer.putChar(nullableString.charAt(i));
- }
- }
-
- public void writeUTF(final String str) throws Exception
- {
- UTF8Util.saveUTF(this, str);
- }
-
- public short readShort()
- {
- return buffer.getShort();
- }
-
- public int readUnsignedShort()
- {
- return buffer.getShort() & 0xFFFF;
- }
-
- public char readChar()
- {
- return buffer.getChar();
- }
-
- public String readString()
- {
- int len = buffer.getInt();
-
- char[] chars = new char[len];
-
- for (int i = 0; i < len; i++)
- {
- chars[i] = buffer.getChar();
- }
-
- return new String(chars);
- }
-
- public void writeSimpleString(final SimpleString string)
- {
- byte[] data = string.getData();
-
- buffer.putInt(data.length);
- buffer.put(data);
- }
-
- public SimpleString readNullableSimpleString()
- {
- int b = buffer.get();
- if (b == NULL)
- {
- return null;
- }
- else
- {
- return readSimpleString();
- }
- }
-
- public void writeNullableSimpleString(final SimpleString string)
- {
- if (string == null)
- {
- buffer.put(NULL);
- }
- else
- {
- buffer.put(NOT_NULL);
- writeSimpleString(string);
- }
- }
-
- public SimpleString readSimpleString()
- {
- int len = buffer.getInt();
-
- byte[] data = new byte[len];
- buffer.get(data);
-
- return new SimpleString(data);
- }
-
- public String readUTF() throws Exception
- {
- return UTF8Util.readUTF(this);
- }
-
- public int limit()
- {
- return buffer.limit();
- }
-
- public void limit(final int limit)
- {
- buffer.limit(limit);
- }
-
- public void writeBoolean(boolean val)
- {
- if (val)
- {
- buffer.put(TRUE);
- }
- else
- {
- buffer.put(FALSE);
- }
- }
-
- public void writeByte(byte val)
- {
- buffer.put(val);
- }
-
- public void writeBytes(byte[] bytes)
- {
- buffer.put(bytes);
- }
-
- public void writeBytes(byte[] bytes, int offset, int len)
- {
- buffer.put(bytes, offset, len);
- }
-
- public void writeDouble(double val)
- {
- buffer.putDouble(val);
- }
-
- public void writeFloat(float val)
- {
- buffer.putFloat(val);
- }
-
- public void writeInt(int val)
- {
- buffer.putInt(val);
- }
-
- public void setInt(int pos, int val)
- {
- buffer.putInt(pos, val);
- }
-
- public void writeLong(long val)
- {
- buffer.putLong(val);
- }
-
- public void writeShort(short val)
- {
- buffer.putShort(val);
- }
-
- public void writeChar(char chr)
- {
- buffer.putChar(chr);
- }
-
- public int remaining()
- {
- return buffer.remaining();
- }
-
- public Object getUnderlyingBuffer()
- {
- return buffer;
- }
-
-}
Deleted: branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/impl/ExpandingMessagingBuffer.java
===================================================================
--- branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/impl/ExpandingMessagingBuffer.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/impl/ExpandingMessagingBuffer.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -1,380 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.core.remoting.impl;
-
-import static org.jboss.messaging.util.DataConstants.NOT_NULL;
-import static org.jboss.messaging.util.DataConstants.NULL;
-
-import java.nio.ByteBuffer;
-
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.util.SimpleString;
-import org.jboss.messaging.util.UTF8Util;
-
-/**
- * A {@link MessagingBuffer} which increases its capacity and length by itself
- * when there's not enough space for a {@code put} operation.
- *
- * @author <a href="mailto:tlee at redhat.com">Trustin Lee</a>
- * @version $Rev$, $Date$
- */
-public class ExpandingMessagingBuffer implements MessagingBuffer
-{
- private ByteBuffer buf;
-
- public ExpandingMessagingBuffer(ByteBuffer buf) {
- this.buf = buf;
- }
-
- public ExpandingMessagingBuffer(int size) {
- this(ByteBuffer.allocate(size));
- }
-
- public byte[] array()
- {
- if(buf.hasArray() && buf.arrayOffset() == 0 && buf.capacity() == buf.array().length)
- {
- return buf.array();
- }
- else
- {
- byte[] b = new byte[remaining()];
- readBytes(b);
- return b;
- }
- }
-
- public int capacity()
- {
- return buf.capacity();
- }
-
- public MessagingBuffer createNewBuffer(int len)
- {
- return new ExpandingMessagingBuffer(len);
- }
-
- public void flip()
- {
- buf.flip();
- }
-
- public boolean readBoolean()
- {
- return readByte() != 0;
- }
-
- public byte readByte()
- {
- return buf.get();
- }
-
- public void readBytes(byte[] bytes)
- {
- buf.get(bytes);
- }
-
- public void readBytes(byte[] bytes, int offset, int length)
- {
- buf.get(bytes, offset, length);
- }
-
- public char readChar()
- {
- return buf.getChar();
- }
-
- public double readDouble()
- {
- return buf.getDouble();
- }
-
- public float readFloat()
- {
- return buf.getFloat();
- }
-
- public int readInt()
- {
- return buf.getInt();
- }
-
- public long readLong()
- {
- return buf.getLong();
- }
-
- public SimpleString readNullableSimpleString()
- {
- if (readByte() == NULL)
- {
- return null;
- }
- else
- {
- return readSimpleString();
- }
- }
-
- public String readNullableString()
- {
- if (readByte() == NULL)
- {
- return null;
- }
- else
- {
- return readString();
- }
- }
-
- public short readShort()
- {
- return buf.getShort();
- }
-
- public SimpleString readSimpleString()
- {
- int len = readInt();
- byte[] data = new byte[len];
- readBytes(data);
- return new SimpleString(data);
- }
-
- public String readString()
- {
- int len = readInt();
- char[] chars = new char[len];
- for (int i = 0; i < len; i++)
- {
- chars[i] = readChar();
- }
- return new String(chars);
- }
-
- public String readUTF() throws Exception
- {
- return UTF8Util.readUTF(this);
- }
-
- public short readUnsignedByte()
- {
- return (short) (readByte() & 0xFF);
- }
-
- public int readUnsignedShort()
- {
- return readShort() & 0xFFFF;
- }
-
- public int limit()
- {
- return buf.limit();
- }
-
- public void limit(int limit)
- {
- buf.limit(limit);
- }
-
- public void position(int position)
- {
- buf.position(position);
- }
-
- public int position()
- {
- return buf.position();
- }
-
- public void writeBoolean(boolean val)
- {
- writeByte((byte) (val ? -1 : 0));
- }
-
- public void writeByte(byte val)
- {
- ensureRemaining(1).put(val);
- }
-
- public void writeBytes(byte[] bytes)
- {
- ensureRemaining(bytes.length).put(bytes);
- }
-
- public void writeBytes(byte[] bytes, int offset, int length)
- {
- ensureRemaining(length).put(bytes, offset, length);
- }
-
- public void writeChar(char val)
- {
- ensureRemaining(2).putChar(val);
- }
-
- public void writeDouble(double val)
- {
- ensureRemaining(8).putDouble(val);
- }
-
- public void writeFloat(float val)
- {
- ensureRemaining(4).putFloat(val);
- }
-
- public void writeInt(int val)
- {
- ensureRemaining(4).putInt(val);
- }
-
- public void setInt(int pos, int val)
- {
- buf.putInt(pos, val);
- }
-
- public void writeLong(long val)
- {
- ensureRemaining(8).putLong(val);
- }
-
- public void writeNullableSimpleString(SimpleString val)
- {
- if (val == null)
- {
- ensureRemaining(1).put(NULL);
- }
- else
- {
- ensureRemaining(5 + (val.length() << 1));
- buf.put(NOT_NULL);
- byte[] data = val.getData();
- ensureRemaining(data.length + 4);
- buf.putInt(data.length);
- buf.put(data);
- }
- }
-
- public void writeNullableString(String val)
- {
- if (val == null)
- {
- ensureRemaining(1).put(NULL);
- }
- else
- {
- ensureRemaining(5 + (val.length() << 1));
- buf.put(NOT_NULL);
- buf.putInt(val.length());
- for (int i = 0; i < val.length(); i++)
- {
- buf.putChar(val.charAt(i));
- }
- }
- }
-
- public void writeShort(short val)
- {
- ensureRemaining(2).putShort(val);
- }
-
- public void writeSimpleString(SimpleString val)
- {
- byte[] data = val.getData();
- ensureRemaining(4 + data.length);
- buf.putInt(data.length);
- buf.put(data);
- }
-
- public void writeString(String val)
- {
- ensureRemaining(4 + (val.length() << 1));
- buf.putInt(val.length());
- for (int i = 0; i < val.length(); i++)
- {
- buf.putChar(val.charAt(i));
- }
- }
-
- public void writeUTF(String utf) throws Exception
- {
- UTF8Util.saveUTF(this, utf);
- }
-
- public int remaining()
- {
- return buf.remaining();
- }
-
- public void rewind()
- {
- buf.rewind();
- }
-
- public MessagingBuffer slice()
- {
- return new ExpandingMessagingBuffer(buf.slice());
- }
-
- public Object getUnderlyingBuffer()
- {
- return buf;
- }
-
- private ByteBuffer ensureRemaining(int minRemaining)
- {
- int remaining = remaining();
- if (remaining >= minRemaining)
- {
- return buf;
- }
-
- int capacity = capacity();
- int limit = limit();
- if (capacity - limit >= minRemaining - remaining) {
- buf.limit(limit + minRemaining - remaining);
- return buf;
- }
-
- int position = position();
- int minCapacityDifference = minRemaining - remaining;
- int oldCapacity = capacity();
- int newCapacity = oldCapacity;
- for (;;)
- {
- newCapacity <<= 1;
- if (newCapacity - oldCapacity >= minCapacityDifference)
- {
- break;
- }
- }
-
- ByteBuffer newBuf = ByteBuffer.allocate(newCapacity);
- buf.clear();
- newBuf.put(buf);
- newBuf.limit(position + minRemaining);
- newBuf.position(position);
- buf = newBuf;
-
- return newBuf;
- }
-}
Modified: branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnection.java
===================================================================
--- branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnection.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnection.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -27,7 +27,7 @@
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.buffers.HeapChannelBuffer;
import org.jboss.messaging.core.remoting.spi.BufferHandler;
import org.jboss.messaging.core.remoting.spi.Connection;
import org.jboss.messaging.core.remoting.spi.ConnectionLifeCycleListener;
@@ -101,7 +101,7 @@
public MessagingBuffer createBuffer(final int size)
{
- return new ByteBufferWrapper(ByteBuffer.allocate(size));
+ return new HeapChannelBuffer(size);
}
public Object getID()
Modified: branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -183,14 +183,12 @@
encodeBody(buffer);
- size = buffer.position();
+ size = buffer.writerIndex();
// The length doesn't include the actual length byte
int len = size - DataConstants.SIZE_INT;
buffer.setInt(0, len);
-
- buffer.flip();
return size;
}
@@ -201,7 +199,7 @@
decodeBody(buffer);
- size = buffer.position();
+ size = buffer.capacity();
}
public final int getPacketSize()
Modified: branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveMessage.java
===================================================================
--- branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveMessage.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveMessage.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -183,7 +183,7 @@
{
clientMessage = new ClientMessageImpl(deliveryCount);
clientMessage.decode(buffer);
- clientMessage.getBody().flip();
+ clientMessage.getBody().resetReaderIndex();
}
}
Modified: branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java
===================================================================
--- branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -211,7 +211,7 @@
serverMessage.decode(buffer);
- serverMessage.getBody().flip();
+ serverMessage.getBody().resetReaderIndex();
requiresResponse = buffer.readBoolean();
}
Modified: branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -45,7 +45,8 @@
import org.jboss.messaging.core.postoffice.QueueBinding;
import org.jboss.messaging.core.remoting.Channel;
import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.buffers.ByteBufferBackedChannelBuffer;
+import org.jboss.messaging.core.remoting.buffers.HeapChannelBuffer;
import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
@@ -62,7 +63,6 @@
import org.jboss.messaging.core.server.ServerSession;
import org.jboss.messaging.core.transaction.Transaction;
import org.jboss.messaging.core.transaction.impl.TransactionImpl;
-import org.jboss.messaging.util.SimpleString;
import org.jboss.messaging.util.TypedProperties;
/**
@@ -834,7 +834,7 @@
{
sentFirstMessage = true;
- MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.allocate(pendingLargeMessage.getPropertiesEncodeSize()));
+ MessagingBuffer headerBuffer = new HeapChannelBuffer(pendingLargeMessage.getPropertiesEncodeSize());
pendingLargeMessage.encodeProperties(headerBuffer);
@@ -984,7 +984,7 @@
localChunkLen = (int)Math.min(sizePendingLargeMessage - positionPendingLargeMessage, minLargeMessageSize);
- MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate(localChunkLen));
+ MessagingBuffer bodyBuffer = new HeapChannelBuffer(localChunkLen);
pendingLargeMessage.encodeBody(bodyBuffer, positionPendingLargeMessage, localChunkLen);
Modified: branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -46,7 +46,7 @@
import org.jboss.messaging.core.remoting.FailureListener;
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.buffers.DynamicChannelBuffer;
import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
@@ -2714,7 +2714,7 @@
{
LargeServerMessage largeMessage = storageManager.createLargeMessage();
- MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.wrap(header));
+ MessagingBuffer headerBuffer = new DynamicChannelBuffer(header);
largeMessage.decodeProperties(headerBuffer);
Added: branches/JBMESSAGING_1394/src/main/org/jboss/messaging/integration/transports/netty/ChannelBufferExtends.java
===================================================================
--- branches/JBMESSAGING_1394/src/main/org/jboss/messaging/integration/transports/netty/ChannelBufferExtends.java (rev 0)
+++ branches/JBMESSAGING_1394/src/main/org/jboss/messaging/integration/transports/netty/ChannelBufferExtends.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -0,0 +1,263 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.integration.transports.netty;
+
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.util.DataConstants;
+import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.util.UTF8Util;
+
+/**
+ * A ChannelBufferExtends
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ * Created Feb 25, 2009 4:20:05 PM
+ *
+ *
+ */
+public class ChannelBufferExtends extends org.jboss.netty.buffer.BigEndianHeapChannelBuffer implements MessagingBuffer
+{
+ public ChannelBufferExtends(int length)
+ {
+ super(length);
+ }
+
+ /**
+ * @param array
+ */
+ public ChannelBufferExtends(byte[] array)
+ {
+ super(array);
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.spi.MessagingBuffer#array()
+ */
+ public byte[] array()
+ {
+ return array;
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.spi.MessagingBuffer#readBoolean()
+ */
+ public boolean readBoolean()
+ {
+ return readByte() != 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.spi.MessagingBuffer#readChar()
+ */
+ public char readChar()
+ {
+ return (char)readShort();
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.spi.MessagingBuffer#readDouble()
+ */
+ public double readDouble()
+ {
+ return Double.longBitsToDouble(readLong());
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.spi.MessagingBuffer#readFloat()
+ */
+ public float readFloat()
+ {
+ return Float.intBitsToFloat(readInt());
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.spi.MessagingBuffer#readNullableSimpleString()
+ */
+ public SimpleString readNullableSimpleString()
+ {
+ int b = readByte();
+ if (b == DataConstants.NULL)
+ {
+ return null;
+ }
+ else
+ {
+ return readSimpleString();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.spi.MessagingBuffer#readNullableString()
+ */
+ public String readNullableString()
+ {
+ int b = readByte();
+ if (b == DataConstants.NULL)
+ {
+ return null;
+ }
+ else
+ {
+ return readString();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.spi.MessagingBuffer#readSimpleString()
+ */
+ public SimpleString readSimpleString()
+ {
+ int len = readInt();
+ byte[] data = new byte[len];
+ readBytes(data);
+ return new SimpleString(data);
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.spi.MessagingBuffer#readString()
+ */
+ public String readString()
+ {
+ int len = readInt();
+ char[] chars = new char[len];
+ for (int i = 0; i < len; i++)
+ {
+ chars[i] = readChar();
+ }
+ return new String(chars);
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.spi.MessagingBuffer#readUTF()
+ */
+ public String readUTF() throws Exception
+ {
+ return UTF8Util.readUTF(this);
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.spi.MessagingBuffer#writeBoolean(boolean)
+ */
+ public void writeBoolean(final boolean val)
+ {
+ writeByte((byte)(val ? -1 : 0));
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.spi.MessagingBuffer#writeChar(char)
+ */
+ public void writeChar(final char val)
+ {
+ writeShort((short)val);
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.spi.MessagingBuffer#writeDouble(double)
+ */
+ public void writeDouble(final double val)
+ {
+ writeLong(Double.doubleToLongBits(val));
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.spi.MessagingBuffer#writeFloat(float)
+ */
+ public void writeFloat(final float val)
+ {
+ writeInt(Float.floatToIntBits(val));
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.spi.MessagingBuffer#writeNullableSimpleString(org.jboss.messaging.util.SimpleString)
+ */
+ public void writeNullableSimpleString(final SimpleString val)
+ {
+ if (val == null)
+ {
+ writeByte(DataConstants.NULL);
+ }
+ else
+ {
+ writeByte(DataConstants.NOT_NULL);
+ writeSimpleString(val);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.spi.MessagingBuffer#writeNullableString(java.lang.String)
+ */
+ public void writeNullableString(final String val)
+ {
+ if (val == null)
+ {
+ writeByte(DataConstants.NULL);
+ }
+ else
+ {
+ writeByte(DataConstants.NOT_NULL);
+ writeString(val);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.spi.MessagingBuffer#writeSimpleString(org.jboss.messaging.util.SimpleString)
+ */
+ public void writeSimpleString(final SimpleString val)
+ {
+ byte[] data = val.getData();
+ writeInt(data.length);
+ writeBytes(data);
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.spi.MessagingBuffer#writeString(java.lang.String)
+ */
+ public void writeString(final String val)
+ {
+ writeInt(val.length());
+ for (int i = 0; i < val.length(); i++)
+ {
+ writeShort((short)val.charAt(i));
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.spi.MessagingBuffer#writeUTF(java.lang.String)
+ */
+ public void writeUTF(final String utf) throws Exception
+ {
+ UTF8Util.saveUTF(this, utf);
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.spi.MessagingBuffer#getUnderlyingBuffer()
+ */
+ public Object getUnderlyingBuffer()
+ {
+ return this;
+ }
+
+}
Modified: branches/JBMESSAGING_1394/src/main/org/jboss/messaging/integration/transports/netty/ChannelBufferWrapper.java
===================================================================
--- branches/JBMESSAGING_1394/src/main/org/jboss/messaging/integration/transports/netty/ChannelBufferWrapper.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/src/main/org/jboss/messaging/integration/transports/netty/ChannelBufferWrapper.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -22,16 +22,8 @@
package org.jboss.messaging.integration.transports.netty;
-import static org.jboss.messaging.util.DataConstants.FALSE;
-import static org.jboss.messaging.util.DataConstants.NOT_NULL;
-import static org.jboss.messaging.util.DataConstants.NULL;
-import static org.jboss.messaging.util.DataConstants.TRUE;
-import static org.jboss.netty.buffer.ChannelBuffers.copiedBuffer;
-import static org.jboss.netty.buffer.ChannelBuffers.dynamicBuffer;
-
-import java.nio.BufferUnderflowException;
-
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.util.DataConstants;
import org.jboss.messaging.util.SimpleString;
import org.jboss.messaging.util.UTF8Util;
import org.jboss.netty.buffer.ChannelBuffer;
@@ -46,404 +38,371 @@
* @author <a href="mailto:tlee at redhat.com">Trustin Lee</a>
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
* @version $Rev$, $Date$
*/
public class ChannelBufferWrapper implements MessagingBuffer
{
- // Constants -----------------------------------------------------
- // Attributes ----------------------------------------------------
-
private final ChannelBuffer buffer;
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public ChannelBufferWrapper(final int size)
+ /**
+ * @param buffer
+ */
+ public ChannelBufferWrapper(final ChannelBuffer buffer)
{
- buffer = dynamicBuffer(size);
- buffer.writerIndex(buffer.capacity());
+ super();
+ this.buffer = buffer;
}
- public ChannelBufferWrapper(final ChannelBuffer buffer)
+ public int capacity()
{
- this.buffer = buffer;
+ return buffer.capacity();
}
- // Public --------------------------------------------------------
-
- // MessagingBuffer implementation ----------------------------------------------
-
- public byte[] array()
+ public void clear()
{
- return buffer.toByteBuffer().array();
+ buffer.clear();
}
- public int position()
+ public boolean readable()
{
- return buffer.readerIndex();
+ return buffer.readable();
}
- public void position(final int position)
+ public int readableBytes()
{
- buffer.readerIndex(position);
+ return buffer.readableBytes();
}
- public int limit()
+ public byte readByte()
{
- return buffer.writerIndex();
+ return buffer.readByte();
}
- public void limit(final int limit)
+ public void readBytes(final byte[] dst, final int dstIndex, final int length)
{
- buffer.writerIndex(limit);
+ buffer.readBytes(dst, dstIndex, length);
}
- public int capacity()
+ public void readBytes(final byte[] dst)
{
- return buffer.capacity();
+ buffer.readBytes(dst);
}
- public void flip()
+ public int readerIndex()
{
- int oldPosition = position();
- position(0);
- limit(oldPosition);
+ return buffer.readerIndex();
}
- public MessagingBuffer slice()
+ public void readerIndex(final int readerIndex)
{
- return new ChannelBufferWrapper(buffer.slice());
+ buffer.readerIndex(readerIndex);
}
- public MessagingBuffer createNewBuffer(int len)
+ public int readInt()
{
- return new ChannelBufferWrapper(len);
+ return buffer.readInt();
}
- public int remaining()
+ public long readLong()
{
- return buffer.readableBytes();
+ return buffer.readLong();
}
- public void rewind()
+ public short readShort()
{
- position(0);
- buffer.markReaderIndex();
+ return buffer.readShort();
}
- public void writeByte(byte byteValue)
+ public short readUnsignedByte()
{
- flip();
- buffer.writeByte(byteValue);
- buffer.readerIndex(buffer.writerIndex());
+ return buffer.readUnsignedByte();
}
- public void writeBytes(final byte[] byteArray)
+ public int readUnsignedShort()
{
- flip();
- buffer.writeBytes(byteArray);
- buffer.readerIndex(buffer.writerIndex());
+ return buffer.readUnsignedShort();
}
- public void writeBytes(final byte[] bytes, int offset, int length)
+ public void resetReaderIndex()
{
- flip();
- buffer.writeBytes(bytes, offset, length);
- buffer.readerIndex(buffer.writerIndex());
+ buffer.resetReaderIndex();
}
- public void writeInt(final int intValue)
+ public void resetWriterIndex()
{
- flip();
- buffer.writeInt(intValue);
- buffer.readerIndex(buffer.writerIndex());
+ buffer.resetWriterIndex();
}
- public void setInt(final int pos, final int intValue)
+ public void setIndex(final int readerIndex, final int writerIndex)
{
- buffer.setInt(pos, intValue);
+ buffer.setIndex(readerIndex, writerIndex);
}
- public void writeLong(final long longValue)
+ public void setInt(final int index, final int value)
{
- flip();
- buffer.writeLong(longValue);
- buffer.readerIndex(buffer.writerIndex());
+ buffer.setInt(index, value);
}
- public void writeFloat(final float floatValue)
+ public boolean writable()
{
- writeInt(Float.floatToIntBits(floatValue));
+ return buffer.writable();
}
- public void writeDouble(final double d)
+ public int writableBytes()
{
- writeLong(Double.doubleToLongBits(d));
+ return buffer.writableBytes();
}
- public void writeShort(final short s)
+ public void writeByte(final byte value)
{
- flip();
- buffer.writeShort(s);
- buffer.readerIndex(buffer.writerIndex());
+ buffer.writeByte(value);
}
- public void writeChar(final char chr)
+ public void writeBytes(final byte[] src, final int srcIndex, final int length)
{
- writeShort((short)chr);
+ buffer.writeBytes(src, srcIndex, length);
}
- public byte readByte()
+ public void writeBytes(final byte[] src)
{
- try
- {
- return buffer.readByte();
- }
- catch (IndexOutOfBoundsException e)
- {
- throw new BufferUnderflowException();
- }
+ buffer.writeBytes(src);
}
- public short readUnsignedByte()
+ public void writeInt(final int value)
{
- try
- {
- return buffer.readUnsignedByte();
- }
- catch (IndexOutOfBoundsException e)
- {
- throw new BufferUnderflowException();
- }
+ buffer.writeInt(value);
}
- public void readBytes(final byte[] b)
+ public void writeLong(final long value)
{
- try
- {
- buffer.readBytes(b);
- }
- catch (IndexOutOfBoundsException e)
- {
- throw new BufferUnderflowException();
- }
+ buffer.writeLong(value);
}
- public void readBytes(final byte[] b, final int offset, final int length)
+ public int writerIndex()
{
- try
- {
- buffer.readBytes(b, offset, length);
- }
- catch (IndexOutOfBoundsException e)
- {
- throw new BufferUnderflowException();
- }
+ return buffer.writerIndex();
}
- public int readInt()
+ public void writerIndex(final int writerIndex)
{
- try
- {
- return buffer.readInt();
- }
- catch (IndexOutOfBoundsException e)
- {
- throw new BufferUnderflowException();
- }
+ buffer.writerIndex(writerIndex);
}
- public long readLong()
+ public void writeShort(final short value)
{
- try
- {
- return buffer.readLong();
- }
- catch (IndexOutOfBoundsException e)
- {
- throw new BufferUnderflowException();
- }
+ buffer.writeShort(value);
}
- public float readFloat()
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.spi.MessagingBuffer#array()
+ */
+ public byte[] array()
{
- return Float.intBitsToFloat(readInt());
+ return buffer.toByteBuffer().array();
}
- public short readShort()
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.spi.MessagingBuffer#readBoolean()
+ */
+ public boolean readBoolean()
{
- try
- {
- return buffer.readShort();
- }
- catch (IndexOutOfBoundsException e)
- {
- throw new BufferUnderflowException();
- }
+ return readByte() != 0;
}
- public int readUnsignedShort()
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.spi.MessagingBuffer#readChar()
+ */
+ public char readChar()
{
- try
- {
- return buffer.readUnsignedShort();
- }
- catch (IndexOutOfBoundsException e)
- {
- throw new BufferUnderflowException();
- }
+ return (char)readShort();
}
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.spi.MessagingBuffer#readDouble()
+ */
public double readDouble()
{
return Double.longBitsToDouble(readLong());
}
- public char readChar()
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.spi.MessagingBuffer#readFloat()
+ */
+ public float readFloat()
{
- return (char)readShort();
+ return Float.intBitsToFloat(readInt());
}
- public void writeBoolean(final boolean b)
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.spi.MessagingBuffer#readNullableSimpleString()
+ */
+ public SimpleString readNullableSimpleString()
{
- if (b)
+ int b = readByte();
+ if (b == DataConstants.NULL)
{
- writeByte(TRUE);
+ return null;
}
else
{
- writeByte(FALSE);
+ return readSimpleString();
}
}
- public boolean readBoolean()
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.spi.MessagingBuffer#readNullableString()
+ */
+ public String readNullableString()
{
- byte b = readByte();
- return b == TRUE;
- }
-
- public void writeString(final String nullableString)
- {
- flip();
- buffer.writeInt(nullableString.length());
- for (int i = 0; i < nullableString.length(); i++)
+ int b = readByte();
+ if (b == DataConstants.NULL)
{
- buffer.writeShort((short)nullableString.charAt(i));
+ return null;
}
- buffer.readerIndex(buffer.writerIndex());
- }
-
- public void writeNullableString(final String nullableString)
- {
- if (nullableString == null)
- {
- writeByte(NULL);
- }
else
{
- writeByte(NOT_NULL);
- writeString(nullableString);
+ return readString();
}
}
- public String readString()
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.spi.MessagingBuffer#readSimpleString()
+ */
+ public SimpleString readSimpleString()
{
int len = readInt();
+ byte[] data = new byte[len];
+ readBytes(data);
+ return new SimpleString(data);
+ }
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.spi.MessagingBuffer#readString()
+ */
+ public String readString()
+ {
+ int len = readInt();
char[] chars = new char[len];
-
for (int i = 0; i < len; i++)
{
chars[i] = readChar();
}
-
return new String(chars);
}
- public String readNullableString()
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.spi.MessagingBuffer#readUTF()
+ */
+ public String readUTF() throws Exception
{
- byte check = readByte();
-
- if (check == NULL)
- {
- return null;
- }
- else
- {
- return readString();
- }
+ return UTF8Util.readUTF(this);
}
- public void writeUTF(final String str) throws Exception
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.spi.MessagingBuffer#writeBoolean(boolean)
+ */
+ public void writeBoolean(final boolean val)
{
- UTF8Util.saveUTF(this, str);
- buffer.readerIndex(buffer.writerIndex());
+ writeByte((byte)(val ? -1 : 0));
}
- public void writeNullableSimpleString(final SimpleString string)
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.spi.MessagingBuffer#writeChar(char)
+ */
+ public void writeChar(final char val)
{
- if (string == null)
- {
- writeByte(NULL);
- }
- else
- {
- writeByte(NOT_NULL);
- writeSimpleString(string);
- }
+ writeShort((short)val);
}
- public void writeSimpleString(final SimpleString string)
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.spi.MessagingBuffer#writeDouble(double)
+ */
+ public void writeDouble(final double val)
{
- byte[] data = string.getData();
+ writeLong(Double.doubleToLongBits(val));
- flip();
- buffer.writeInt(data.length);
- buffer.writeBytes(data);
- buffer.readerIndex(buffer.writerIndex());
}
- public SimpleString readSimpleString()
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.spi.MessagingBuffer#writeFloat(float)
+ */
+ public void writeFloat(final float val)
{
- int len = readInt();
+ writeInt(Float.floatToIntBits(val));
- byte[] data = new byte[len];
- readBytes(data);
+ }
- return new SimpleString(data);
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.spi.MessagingBuffer#writeNullableSimpleString(org.jboss.messaging.util.SimpleString)
+ */
+ public void writeNullableSimpleString(final SimpleString val)
+ {
+ if (val == null)
+ {
+ writeByte(DataConstants.NULL);
+ }
+ else
+ {
+ writeByte(DataConstants.NOT_NULL);
+ writeSimpleString(val);
+ }
}
- public SimpleString readNullableSimpleString()
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.spi.MessagingBuffer#writeNullableString(java.lang.String)
+ */
+ public void writeNullableString(final String val)
{
- int b = readByte();
- if (b == NULL)
+ if (val == null)
{
- return null;
+ writeByte(DataConstants.NULL);
}
else
{
- return readSimpleString();
+ writeByte(DataConstants.NOT_NULL);
+ writeString(val);
}
}
- public String readUTF() throws Exception
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.spi.MessagingBuffer#writeSimpleString(org.jboss.messaging.util.SimpleString)
+ */
+ public void writeSimpleString(final SimpleString val)
{
- return UTF8Util.readUTF(this);
+ byte[] data = val.getData();
+ writeInt(data.length);
+ writeBytes(data);
}
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.spi.MessagingBuffer#writeString(java.lang.String)
+ */
+ public void writeString(final String val)
+ {
+ writeInt(val.length());
+ for (int i = 0; i < val.length(); i++)
+ {
+ writeShort((short)val.charAt(i));
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.spi.MessagingBuffer#writeUTF(java.lang.String)
+ */
+ public void writeUTF(final String utf) throws Exception
+ {
+ UTF8Util.saveUTF(this, utf);
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.remoting.spi.MessagingBuffer#getUnderlyingBuffer()
+ */
public Object getUnderlyingBuffer()
{
return buffer;
}
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
\ No newline at end of file
+}
Modified: branches/JBMESSAGING_1394/src/main/org/jboss/messaging/integration/transports/netty/MessagingChannelHandler.java
===================================================================
--- branches/JBMESSAGING_1394/src/main/org/jboss/messaging/integration/transports/netty/MessagingChannelHandler.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/src/main/org/jboss/messaging/integration/transports/netty/MessagingChannelHandler.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -26,6 +26,7 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.spi.BufferHandler;
import org.jboss.messaging.core.remoting.spi.ConnectionLifeCycleListener;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
@@ -58,8 +59,14 @@
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
{
- ChannelBuffer buffer = (ChannelBuffer)e.getMessage();
- handler.bufferReceived(e.getChannel().getId(), new ChannelBufferWrapper(buffer));
+ if (e.getMessage() instanceof MessagingBuffer)
+ {
+ handler.bufferReceived(e.getChannel().getId(), (MessagingBuffer)e.getMessage());
+ }
+ else
+ {
+ handler.bufferReceived(e.getChannel().getId(), new ChannelBufferWrapper((ChannelBuffer)e.getMessage()));
+ }
}
@Override
Modified: branches/JBMESSAGING_1394/src/main/org/jboss/messaging/integration/transports/netty/MessagingFrameDecoder.java
===================================================================
--- branches/JBMESSAGING_1394/src/main/org/jboss/messaging/integration/transports/netty/MessagingFrameDecoder.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/src/main/org/jboss/messaging/integration/transports/netty/MessagingFrameDecoder.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -26,6 +26,7 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.spi.BufferHandler;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
@@ -60,7 +61,16 @@
// TODO - we can avoid this entirely if we maintain fragmented packets in the handler
int start = in.readerIndex();
- int length = handler.isReadyToHandle(new ChannelBufferWrapper(in));
+ int length;
+
+ if (in instanceof MessagingBuffer)
+ {
+ length = handler.isReadyToHandle((MessagingBuffer)in);
+ }
+ else
+ {
+ length = handler.isReadyToHandle(new ChannelBufferWrapper(in));
+ }
if (length == -1)
{
in.readerIndex(start);
Modified: branches/JBMESSAGING_1394/src/main/org/jboss/messaging/integration/transports/netty/NettyConnection.java
===================================================================
--- branches/JBMESSAGING_1394/src/main/org/jboss/messaging/integration/transports/netty/NettyConnection.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/src/main/org/jboss/messaging/integration/transports/netty/NettyConnection.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -27,6 +27,7 @@
import org.jboss.messaging.core.remoting.spi.Connection;
import org.jboss.messaging.core.remoting.spi.ConnectionLifeCycleListener;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.handler.ssl.SslHandler;
@@ -115,7 +116,8 @@
public MessagingBuffer createBuffer(int size)
{
- return new ChannelBufferWrapper(size);
+ return new ChannelBufferWrapper(ChannelBuffers.dynamicBuffer(size));
+ //return new ChannelBufferExtends(size);
}
public Object getID()
Modified: branches/JBMESSAGING_1394/src/main/org/jboss/messaging/jms/client/JBossBytesMessage.java
===================================================================
--- branches/JBMESSAGING_1394/src/main/org/jboss/messaging/jms/client/JBossBytesMessage.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/src/main/org/jboss/messaging/jms/client/JBossBytesMessage.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -258,9 +258,9 @@
{
checkRead();
- if (getBody().remaining() == 0) { return -1; }
+ if (!getBody().readable()) { return -1; }
- int read = Math.min(length, getBody().remaining());
+ int read = Math.min(length, getBody().readableBytes());
if (read != 0)
{
@@ -402,11 +402,11 @@
{
readOnly = true;
- getBody().flip();
+ getBody().resetReaderIndex();
}
else
{
- getBody().rewind();
+ getBody().resetReaderIndex();
}
}
@@ -416,14 +416,14 @@
{
super.clearBody();
MessagingBuffer currentBody = message.getBody();
- message.setBody(currentBody.createNewBuffer(1024));
+ currentBody.clear();
}
public long getBodyLength() throws JMSException
{
checkRead();
- return getBody().limit();
+ return getBody().writerIndex();
}
public void doBeforeSend() throws Exception
Modified: branches/JBMESSAGING_1394/src/main/org/jboss/messaging/jms/client/JBossMessage.java
===================================================================
--- branches/JBMESSAGING_1394/src/main/org/jboss/messaging/jms/client/JBossMessage.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/src/main/org/jboss/messaging/jms/client/JBossMessage.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -12,7 +12,6 @@
package org.jboss.messaging.jms.client;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
@@ -34,7 +33,7 @@
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.message.impl.MessageImpl;
-import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.buffers.DynamicChannelBuffer;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.jms.JBossDestination;
import org.jboss.messaging.util.SimpleString;
@@ -183,7 +182,7 @@
0,
System.currentTimeMillis(),
(byte)4,
- new ByteBufferWrapper(ByteBuffer.allocate(1024)));
+ new DynamicChannelBuffer(1024));
}
@@ -194,7 +193,7 @@
0,
System.currentTimeMillis(),
(byte)4,
- new ByteBufferWrapper(ByteBuffer.allocate(1024)));
+ new DynamicChannelBuffer(1024));
}
@@ -903,7 +902,7 @@
public void doBeforeSend() throws Exception
{
- message.getBody().flip();
+ message.getBody().resetReaderIndex();
}
public void doBeforeReceive() throws Exception
@@ -911,7 +910,7 @@
MessagingBuffer body = message.getBody();
if (body != null)
{
- body.rewind();
+ body.resetReaderIndex();
}
}
Modified: branches/JBMESSAGING_1394/src/main/org/jboss/messaging/jms/client/JBossStreamMessage.java
===================================================================
--- branches/JBMESSAGING_1394/src/main/org/jboss/messaging/jms/client/JBossStreamMessage.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/src/main/org/jboss/messaging/jms/client/JBossStreamMessage.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -553,13 +553,8 @@
if (!readOnly)
{
readOnly = true;
-
- getBody().flip();
}
- else
- {
- getBody().rewind();
- }
+ getBody().resetReaderIndex();
}
// JBossMessage overrides ----------------------------------------
@@ -567,8 +562,7 @@
public void clearBody() throws JMSException
{
super.clearBody();
- MessagingBuffer currentBody = message.getBody();
- message.setBody(currentBody.createNewBuffer(1024));
+ message.getBody().clear();
}
public void doBeforeSend() throws Exception
Modified: branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/basic/AutoGroupClientTest.java
===================================================================
--- branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/basic/AutoGroupClientTest.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/basic/AutoGroupClientTest.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -97,7 +97,6 @@
ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
System.currentTimeMillis(), (byte) 1);
message.getBody().writeString("testINVMCoreClient");
- message.getBody().flip();
message.setDurable(false);
producer.send(message);
}
@@ -159,7 +158,6 @@
ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
System.currentTimeMillis(), (byte) 1);
message.getBody().writeString("testINVMCoreClient");
- message.getBody().flip();
producer.send(message);
}
for (int i = 0; i < numMessages; i++)
@@ -167,7 +165,6 @@
ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
System.currentTimeMillis(), (byte) 1);
message.getBody().writeString("testINVMCoreClient");
- message.getBody().flip();
producer2.send(message);
}
latch.await();
@@ -223,7 +220,6 @@
ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
System.currentTimeMillis(), (byte) 1);
message.getBody().writeString("testINVMCoreClient");
- message.getBody().flip();
message.setDurable(false);
producer.send(message);
}
Modified: branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/basic/CoreClientTest.java
===================================================================
--- branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/basic/CoreClientTest.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/basic/CoreClientTest.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -32,6 +32,7 @@
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.config.impl.ConfigurationImpl;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.buffers.HeapChannelBuffer;
import org.jboss.messaging.core.server.Messaging;
import org.jboss.messaging.core.server.MessagingService;
import org.jboss.messaging.jms.client.JBossTextMessage;
@@ -91,14 +92,15 @@
ClientProducer producer = session.createProducer(QUEUE);
- final int numMessages = 10000;
+ final int numMessages = 1000;
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
System.currentTimeMillis(), (byte) 1);
+ message.setBody(new HeapChannelBuffer(3000));
+
message.getBody().writeString("testINVMCoreClient");
- message.getBody().flip();
producer.send(message);
}
Modified: branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/basic/SelfExpandingBufferTest.java
===================================================================
--- branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/basic/SelfExpandingBufferTest.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/basic/SelfExpandingBufferTest.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -99,7 +99,6 @@
{
buffer.writeBytes(new byte[1024]);
}
- buffer.flip();
ClientProducer prod = session.createProducer(ADDRESS);
Modified: branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java
===================================================================
--- branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -35,11 +35,10 @@
import org.jboss.messaging.core.client.ClientProducer;
import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.message.impl.MessageImpl;
-import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.buffers.HeapChannelBuffer;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.core.server.MessagingService;
import org.jboss.messaging.core.server.Queue;
@@ -296,8 +295,8 @@
else
{
MessagingBuffer buffer = message.getBody();
- buffer.rewind();
- assertEquals(numberOfIntegers * DataConstants.SIZE_INT, buffer.limit());
+ buffer.resetReaderIndex();
+ assertEquals(numberOfIntegers * DataConstants.SIZE_INT, buffer.writerIndex());
for (int b = 0; b < numberOfIntegers; b++)
{
assertEquals(b, buffer.readInt());
@@ -339,14 +338,12 @@
protected MessagingBuffer createLargeBuffer(final int numberOfIntegers)
{
- ByteBuffer ioBuffer = ByteBuffer.allocate(DataConstants.SIZE_INT * numberOfIntegers);
- MessagingBuffer body = new ByteBufferWrapper(ioBuffer);
+ MessagingBuffer body = new HeapChannelBuffer(DataConstants.SIZE_INT * numberOfIntegers);
for (int i = 0; i < numberOfIntegers; i++)
{
body.writeInt(i);
}
- body.flip();
return body;
Modified: branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
===================================================================
--- branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -42,7 +42,7 @@
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.message.Message;
-import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.buffers.HeapChannelBuffer;
import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
import org.jboss.messaging.core.remoting.server.impl.RemotingServiceImpl;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
@@ -570,7 +570,7 @@
assertNotNull(clientMessage);
- assertEquals(numberOfIntegers * 4, clientMessage.getBody().limit());
+ assertEquals(numberOfIntegers * 4, clientMessage.getBody().writerIndex());
clientMessage.acknowledge();
@@ -656,8 +656,6 @@
ClientProducer producer = session.createProducer(ADDRESS);
- ByteBuffer ioBuffer = ByteBuffer.allocate(DataConstants.SIZE_INT * numberOfIntegers);
-
// printBuffer("body to be sent : " , body);
ClientMessage message = null;
@@ -666,13 +664,12 @@
for (int i = 0; i < 100; i++)
{
- MessagingBuffer bodyLocal = new ByteBufferWrapper(ioBuffer);
+ MessagingBuffer bodyLocal = new HeapChannelBuffer(DataConstants.SIZE_INT * numberOfIntegers);
for (int j = 1; j <= numberOfIntegers; j++)
{
bodyLocal.writeInt(j);
}
- bodyLocal.flip();
if (i == 0)
{
@@ -721,7 +718,7 @@
try
{
- assertEqualsByteArrays(body.limit(), body.array(), message2.getBody().array());
+ assertEqualsByteArrays(body.writerIndex(), body.array(), message2.getBody().array());
}
catch (AssertionFailedError e)
{
Modified: branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/bridge/BridgeReconnectTest.java
===================================================================
--- branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/bridge/BridgeReconnectTest.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/bridge/BridgeReconnectTest.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -167,7 +167,6 @@
{
ClientMessage message = session0.createClientMessage(false);
message.putIntProperty(propKey, i);
- message.getBody().flip();
prod0.send(message);
}
@@ -298,7 +297,6 @@
{
ClientMessage message = session0.createClientMessage(false);
message.putIntProperty(propKey, i);
- message.getBody().flip();
prod0.send(message);
}
@@ -456,7 +454,6 @@
{
ClientMessage message = session0.createClientMessage(false);
message.putIntProperty(propKey, i);
- message.getBody().flip();
prod0.send(message);
}
@@ -573,7 +570,6 @@
{
ClientMessage message = session0.createClientMessage(false);
message.putIntProperty(propKey, i);
- message.getBody().flip();
prod0.send(message);
}
@@ -687,7 +683,6 @@
{
ClientMessage message = session0.createClientMessage(false);
message.putIntProperty(propKey, i);
- message.getBody().flip();
prod0.send(message);
}
@@ -708,7 +703,6 @@
{
ClientMessage message = session0.createClientMessage(false);
message.putIntProperty(propKey, i);
- message.getBody().flip();
prod0.send(message);
}
Modified: branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
--- branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/bridge/BridgeTest.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/bridge/BridgeTest.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -371,8 +371,6 @@
message.getBody().writeString("doo be doo be doo be doo");
- message.getBody().flip();
-
producer0.send(message);
}
Modified: branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/bridge/SimpleTransformer.java
===================================================================
--- branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/bridge/SimpleTransformer.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/bridge/SimpleTransformer.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -63,8 +63,6 @@
{
throw new IllegalStateException("Wrong body!!");
}
-
- buffer.flip();
buffer.writeString("dee be dee be dee be dee");
Modified: branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ActivationTimeoutTest.java
===================================================================
--- branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ActivationTimeoutTest.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ActivationTimeoutTest.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -110,7 +110,6 @@
(byte)1);
message.putIntProperty(new SimpleString("count"), i);
message.getBody().writeString("aardvarks");
- message.getBody().flip();
producer.send(message);
}
log.info("Sent messages");
@@ -197,7 +196,6 @@
(byte)1);
message.putIntProperty(new SimpleString("count"), i);
message.getBody().writeString("aardvarks");
- message.getBody().flip();
producer.send(message);
}
log.info("Sent messages");
Modified: branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/failover/AutomaticFailoverWithDiscoveryTest.java
===================================================================
--- branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/failover/AutomaticFailoverWithDiscoveryTest.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/failover/AutomaticFailoverWithDiscoveryTest.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -109,7 +109,6 @@
(byte)1);
message.putIntProperty(new SimpleString("count"), i);
message.getBody().writeString("aardvarks");
- message.getBody().flip();
producer.send(message);
}
Modified: branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailBackupServerTest.java
===================================================================
--- branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailBackupServerTest.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailBackupServerTest.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -105,7 +105,6 @@
(byte)1);
message.putIntProperty(new SimpleString("count"), i);
message.getBody().writeString("aardvarks");
- message.getBody().flip();
producer.send(message);
}
@@ -150,7 +149,6 @@
message = session1.createClientMessage(JBossTextMessage.TYPE, false, 0, System.currentTimeMillis(), (byte)1);
message.putIntProperty(new SimpleString("count"), i);
message.getBody().writeString("aardvarks");
- message.getBody().flip();
producer.send(message);
}
@@ -198,7 +196,6 @@
(byte)1);
message.putIntProperty(new SimpleString("count"), i);
message.getBody().writeString("aardvarks");
- message.getBody().flip();
producer.send(message);
}
Modified: branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverExpiredMessageTest.java
===================================================================
--- branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverExpiredMessageTest.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverExpiredMessageTest.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -115,7 +115,6 @@
(byte)1);
message.putIntProperty(new SimpleString("count"), i);
message.getBody().writeString("aardvarks");
- message.getBody().flip();
producer.send(message);
}
ClientConsumer consumer1 = session1.createConsumer(ADDRESS);
Modified: branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverManagementTest.java
===================================================================
--- branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverManagementTest.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverManagementTest.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -106,8 +106,6 @@
{
ClientMessage msg = session1.createClientMessage(false);
- msg.getBody().flip();
-
producer.send(msg);
}
@@ -119,7 +117,6 @@
ObjectNames.getQueueObjectName(ADDRESS, ADDRESS),
"MessageCount");
managementMessage.putStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME, replyTo);
- managementMessage.getBody().flip();
producer.send(DEFAULT_MANAGEMENT_ADDRESS, managementMessage);
}
@@ -139,8 +136,6 @@
ObjectNames.getQueueObjectName(ADDRESS, ADDRESS),
"MessageCount");
managementMessage.putStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME, replyTo);
-
- managementMessage.getBody().flip();
producer.send(DEFAULT_MANAGEMENT_ADDRESS, managementMessage);
}
@@ -200,8 +195,6 @@
{
ClientMessage msg = session1.createClientMessage(false);
- msg.getBody().flip();
-
producer.send(msg);
}
@@ -213,7 +206,6 @@
ObjectNames.getQueueObjectName(ADDRESS, ADDRESS),
"MessageCount");
managementMessage.putStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME, replyTo);
- managementMessage.getBody().flip();
producer.send(DEFAULT_MANAGEMENT_ADDRESS, managementMessage);
}
Modified: branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverNoSessionsFailoverTest.java
===================================================================
--- branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverNoSessionsFailoverTest.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverNoSessionsFailoverTest.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -101,7 +101,6 @@
(byte)1);
message.putIntProperty(new SimpleString("count"), i);
message.getBody().writeString("aardvarks");
- message.getBody().flip();
producer.send(message);
}
@@ -148,7 +147,6 @@
(byte)1);
message2.putIntProperty(new SimpleString("count"), i);
message2.getBody().writeString("aardvarks");
- message2.getBody().flip();
producer2.send(message2);
}
Modified: branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverPreAcknowledgeTest.java
===================================================================
--- branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverPreAcknowledgeTest.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverPreAcknowledgeTest.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -94,7 +94,6 @@
false);
message.putIntProperty(new SimpleString("count"), i);
message.getBody().writeString("more aardvarks");
- message.getBody().flip();
producer.send(message);
}
ClientConsumer consumer1 = session1.createConsumer(ADDRESS);
Modified: branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverScheduledMessageTest.java
===================================================================
--- branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverScheduledMessageTest.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverScheduledMessageTest.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -116,7 +116,6 @@
(byte)1);
message.putIntProperty(new SimpleString("count"), i);
message.getBody().writeString("aardvarks");
- message.getBody().flip();
long deliveryTime = now + delay * i;
message.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, deliveryTime);
producer.send(message);
Modified: branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailureOnCreateConnectionTest.java
===================================================================
--- branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailureOnCreateConnectionTest.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailureOnCreateConnectionTest.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -147,7 +147,6 @@
(byte)1);
message.putIntProperty(new SimpleString("count"), i);
message.getBody().writeString("aardvarks");
- message.getBody().flip();
producer.send(message);
}
Modified: branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/failover/JustReplicationTest.java
===================================================================
--- branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/failover/JustReplicationTest.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/integration/cluster/failover/JustReplicationTest.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -30,7 +30,7 @@
import org.jboss.messaging.core.client.ClientProducer;
import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.buffers.ByteBufferBackedChannelBuffer;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.util.SimpleString;
@@ -163,11 +163,12 @@
ByteBuffer buffer = ByteBuffer.allocate(numberOfBytes);
buffer.putInt(i);
+
+ MessagingBuffer msgbuffer = new ByteBufferBackedChannelBuffer(buffer);
+ msgbuffer.writerIndex(msgbuffer.capacity());
- buffer.rewind();
+ message.setBody(msgbuffer);
- message.setBody(new ByteBufferWrapper(buffer));
-
producer.send(message);
}
@@ -186,10 +187,8 @@
MessagingBuffer buffer = message.getBody();
- buffer.rewind();
+ assertEquals(numberOfBytes, buffer.writerIndex());
- assertEquals(numberOfBytes, buffer.limit());
-
assertEquals(i, buffer.readInt());
}
Modified: branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
===================================================================
--- branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -252,7 +252,7 @@
public String getTextMessage(ClientMessage m)
{
- m.getBody().rewind();
+ m.getBody().resetReaderIndex();
return m.getBody().readString();
}
@@ -264,7 +264,6 @@
System.currentTimeMillis(),
(byte)1);
message.getBody().writeString(s);
- message.getBody().flip();
return message;
}
@@ -276,7 +275,6 @@
System.currentTimeMillis(),
(byte)1);
message.getBody().writeBytes(b);
- message.getBody().flip();
return message;
}
Modified: branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java
===================================================================
--- branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java 2009-02-25 21:30:38 UTC (rev 5932)
+++ branches/JBMESSAGING_1394/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java 2009-02-26 02:04:43 UTC (rev 5933)
@@ -45,7 +45,8 @@
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.journal.EncodingSupport;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.buffers.ByteBufferBackedChannelBuffer;
+import org.jboss.messaging.core.remoting.buffers.HeapChannelBuffer;
import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.core.server.MessageReference;
@@ -482,7 +483,7 @@
byte[] compareArray = new byte[size];
- MessagingBuffer buffer = new ByteBufferWrapper(ByteBuffer.wrap(compareArray));
+ MessagingBuffer buffer = new HeapChannelBuffer(compareArray);
encoding.encode(buffer);
for (int i = 0; i < expectedArray.length; i++)
@@ -595,7 +596,7 @@
0,
System.currentTimeMillis(),
(byte)4,
- new ByteBufferWrapper(ByteBuffer.allocateDirect(1024)));
+ new ByteBufferBackedChannelBuffer(ByteBuffer.allocateDirect(1024)));
message.setMessageID(id);
@@ -640,7 +641,6 @@
System.currentTimeMillis(),
(byte)1);
message.getBody().writeString(s);
- message.getBody().flip();
return message;
}
// Private -------------------------------------------------------
More information about the jboss-cvs-commits
mailing list