[jboss-cvs] JBoss Messaging SVN: r5103 - in branches/Branch_Chunk_Clebert: src/main/org/jboss/messaging/core/journal/impl and 6 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Oct 10 18:56:14 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-10-10 18:56:13 -0400 (Fri, 10 Oct 2008)
New Revision: 5103
Modified:
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/message/Message.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerSession.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/chunkmessage/MessageChunkTest.java
Log:
Server2client chunks
(a lot of work for a friday :-) )
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2008-10-10 15:55:41 UTC (rev 5102)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2008-10-10 22:56:13 UTC (rev 5103)
@@ -33,8 +33,8 @@
import org.jboss.messaging.core.remoting.Channel;
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerCloseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionScheduledSendMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendChunkMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionScheduledSendMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
@@ -57,8 +57,8 @@
// Attributes -----------------------------------------------------------------------------------
- // TODO This is temporary, make this better
- private static final int BIG_PACKAGE_SIZE = 10 * 1024;
+ // TODO This is temporary, make this configurable somewhere
+ public static final int BIG_PACKAGE_SIZE = 10 * 1024;
// TODO This is temporary, make this better
public static final int CHUNK_SIZE = 10 * 1024;
@@ -149,14 +149,14 @@
doSend(address, msg, 0);
}
- public void send(final ClientMessage msg, long scheduleDeliveryTime) throws MessagingException
+ public void send(final ClientMessage msg, final long scheduleDeliveryTime) throws MessagingException
{
checkClosed();
doSend(null, msg, scheduleDeliveryTime);
}
- public void send(final SimpleString address, final ClientMessage msg, long scheduleDeliveryTime) throws MessagingException
+ public void send(final SimpleString address, final ClientMessage msg, final long scheduleDeliveryTime) throws MessagingException
{
checkClosed();
@@ -305,7 +305,7 @@
closed = true;
}
- private void doSend(final SimpleString address, final ClientMessage msg, long scheduledDeliveryTime) throws MessagingException
+ private void doSend(final SimpleString address, final ClientMessage msg, final long scheduledDeliveryTime) throws MessagingException
{
if (address != null)
{
@@ -330,57 +330,17 @@
boolean sendBlocking = msg.isDurable() ? blockOnPersistentSend : blockOnNonPersistentSend;
-
if (msg.getEncodeSize() > BIG_PACKAGE_SIZE)
{
- int headerSize = msg.getPropertiesEncodeSize();
+ sendMessageInChunks(msg);
- if (headerSize > BIG_PACKAGE_SIZE)
- {
- throw new MessagingException(MessagingException.ILLEGAL_STATE,
- "Header size is too big, use the messageBody for large data");
- }
-
- MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.allocate(headerSize));
- msg.encodeProperties(headerBuffer);
-
- final int bodySize = msg.getBodyEncodeSize();
-
- int bodyLength = BIG_PACKAGE_SIZE - headerSize;
-
- MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate(bodyLength));
-
- msg.encodeBody(bodyBuffer, 0, bodyLength);
-
- SessionSendChunkMessage chunk = new SessionSendChunkMessage(id,
- headerBuffer.array(),
- bodyBuffer.array(),
- true,
- true);
-
- channel.sendBlocking(chunk);
-
- for (int pos = bodyLength; pos < bodySize;)
- {
- bodyLength = Math.min(bodySize - pos, CHUNK_SIZE);
- bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate(bodyLength));
-
- msg.encodeBody(bodyBuffer, pos, bodyLength);
-
- chunk = new SessionSendChunkMessage(id, null, bodyBuffer.array(), pos + bodyLength < bodySize, true);
-
- channel.sendBlocking(chunk);
-
- pos += bodyLength;
- }
-
}
else
{
SessionSendMessage message;
- //check to see if this message need to be scheduled.
- if(scheduledDeliveryTime <= 0)
+ // check to see if this message need to be scheduled.
+ if (scheduledDeliveryTime <= 0)
{
message = new SessionSendMessage(id, msg, sendBlocking);
}
@@ -412,6 +372,54 @@
}
}
+ /**
+ * @param msg
+ * @throws MessagingException
+ */
+ private void sendMessageInChunks(final ClientMessage msg) throws MessagingException
+ {
+ int headerSize = msg.getPropertiesEncodeSize();
+
+ if (headerSize > BIG_PACKAGE_SIZE)
+ {
+ throw new MessagingException(MessagingException.ILLEGAL_STATE,
+ "Header size is too big, use the messageBody for large data");
+ }
+
+ MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.allocate(headerSize));
+ msg.encodeProperties(headerBuffer);
+
+ final int bodySize = msg.getBodySize();
+
+ int bodyLength = BIG_PACKAGE_SIZE - headerSize;
+
+ MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate(bodyLength));
+
+ msg.encodeBody(bodyBuffer, 0, bodyLength);
+
+ SessionSendChunkMessage chunk = new SessionSendChunkMessage(id,
+ headerBuffer.array(),
+ bodyBuffer.array(),
+ bodyLength < bodySize,
+ true);
+
+ channel.sendBlocking(chunk);
+
+ for (int pos = bodyLength; pos < bodySize;)
+ {
+ bodyLength = Math.min(bodySize - pos, CHUNK_SIZE);
+ bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate(bodyLength));
+
+ msg.encodeBody(bodyBuffer, pos, bodyLength);
+
+ chunk = new SessionSendChunkMessage(id, null, bodyBuffer.array(), pos + bodyLength < bodySize, true);
+
+ channel.sendBlocking(chunk);
+
+ pos += bodyLength;
+ }
+ }
+
private void checkClosed() throws MessagingException
{
if (closed)
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java 2008-10-10 15:55:41 UTC (rev 5102)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java 2008-10-10 22:56:13 UTC (rev 5103)
@@ -25,14 +25,22 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EXCEPTION;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVETOKENS;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_MSG;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CHUNK_SEND;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.ChannelHandler;
import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerFlowCreditMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendChunkMessage;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
/**
*
@@ -46,6 +54,8 @@
private static final Logger log = Logger.getLogger(ClientSessionPacketHandler.class);
private final ClientSessionInternal clientSession;
+
+ private Map<Long, ClientMessage> currentChunk = new ConcurrentHashMap<Long, ClientMessage>();
public ClientSessionPacketHandler(final ClientSessionInternal clientSesssion)
{
@@ -68,6 +78,52 @@
break;
}
+ case SESS_CHUNK_SEND:
+ {
+ System.out.println("received a chunk");
+ SessionSendChunkMessage chunk = (SessionSendChunkMessage) packet;
+
+ ClientMessage currentChunkMessage = null;
+
+ if (chunk.getHeader() != null)
+ {
+
+ MessagingBuffer header = new ByteBufferWrapper(ByteBuffer.wrap(chunk.getHeader()));
+
+ currentChunkMessage = new ClientMessageImpl();
+
+ currentChunkMessage.decodeProperties(header);
+
+ MessagingBuffer initialBody = new ByteBufferWrapper(ByteBuffer.wrap(chunk.getBody()));
+
+ currentChunkMessage.setBody(initialBody);
+
+ currentChunk.put(chunk.getTargetID(), currentChunkMessage);
+ }
+ else
+ {
+ ByteBuffer body = ByteBuffer.wrap(chunk.getBody());
+
+ currentChunkMessage = currentChunk.get(chunk.getMessageID());
+
+ MessagingBuffer currentBody = currentChunkMessage.getBody();
+
+ MessagingBuffer newBody = new ByteBufferWrapper(ByteBuffer.allocate(currentBody.limit() + body.limit()));
+
+ newBody.putBytes(currentBody.array());
+ newBody.putBytes(body.array());
+
+ currentChunkMessage.setBody(newBody);
+ }
+
+ if (!chunk.isContinues())
+ {
+ clientSession.handleReceiveMessage(chunk.getTargetID(), currentChunkMessage);
+ }
+
+
+ break;
+ }
case SESS_RECEIVE_MSG:
{
SessionReceiveMessage message = (SessionReceiveMessage) packet;
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java 2008-10-10 15:55:41 UTC (rev 5102)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java 2008-10-10 22:56:13 UTC (rev 5103)
@@ -232,5 +232,10 @@
{
return (int)channel.position();
}
+
+ public String toString()
+ {
+ return "NIOSequentialFile " + this.fileName;
+ }
}
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/message/Message.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/message/Message.java 2008-10-10 15:55:41 UTC (rev 5102)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/message/Message.java 2008-10-10 22:56:13 UTC (rev 5103)
@@ -82,7 +82,7 @@
void decodeProperties(MessagingBuffer buffer);
- int getBodyEncodeSize();
+ int getBodySize();
// Used on Message chunk
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2008-10-10 15:55:41 UTC (rev 5102)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2008-10-10 22:56:13 UTC (rev 5103)
@@ -151,7 +151,7 @@
public int getEncodeSize()
{
- return getPropertiesEncodeSize() + getBodyEncodeSize();
+ return getPropertiesEncodeSize() + SIZE_INT + getBodySize();
}
public int getPropertiesEncodeSize()
@@ -165,9 +165,9 @@
/* PropertySize and Properties */properties.getEncodeSize();
}
- public int getBodyEncodeSize()
+ public int getBodySize()
{
- return /* BodySize and Body */SIZE_INT + body.limit();
+ return /* BodySize and Body */ body.limit();
}
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java 2008-10-10 15:55:41 UTC (rev 5102)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java 2008-10-10 22:56:13 UTC (rev 5103)
@@ -24,7 +24,6 @@
import java.nio.ByteBuffer;
-import org.jboss.messaging.core.client.impl.ClientProducerImpl;
import org.jboss.messaging.core.journal.SequentialFile;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.core.server.ServerLargeMessage;
@@ -77,32 +76,31 @@
}
@Override
- public void encodeBody(final MessagingBuffer bufferOut)
+ public void encodeBody(MessagingBuffer bufferOut, int start, int size)
{
try
{
- ByteBuffer bufferRead = ByteBuffer.allocate(ClientProducerImpl.CHUNK_SIZE);
+ // This could maybe be optimized (maybe reading directly into bufferOut)
+ ByteBuffer bufferRead = ByteBuffer.allocate(size);
if (!file.isOpen())
{
file.open();
}
int bytesRead = 0;
- file.position(0);
- do
- {
- bufferRead.clear();
- bytesRead = file.read(bufferRead);
- bufferRead.flip();
+ file.position(start);
- if (bytesRead > 0)
- {
- bufferOut.putBytes(bufferRead.array(), 0, bytesRead);
- }
+ bytesRead = file.read(bufferRead);
+
+
+ bufferRead.flip();
+
+ if (bytesRead > 0)
+ {
+ bufferOut.putBytes(bufferRead.array(), 0, bytesRead);
}
- while (bytesRead == ClientProducerImpl.CHUNK_SIZE);
- releaseResources();
+ //releaseResources();
}
catch (Exception e)
{
@@ -111,7 +109,7 @@
}
@Override
- public int getBodyEncodeSize()
+ public int getBodySize()
{
try
{
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerSession.java 2008-10-10 15:55:41 UTC (rev 5102)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerSession.java 2008-10-10 22:56:13 UTC (rev 5103)
@@ -147,7 +147,7 @@
ServerLargeMessage getCurrentLargeMessage(long producerID);
- ServerLargeMessage createLargeMessageStorage(long producerID, byte[] header) throws Exception;
+ ServerLargeMessage createLargeMessageStorage(long producerID, long messageID, byte[] header) throws Exception;
void clearCurrentLargeMessage(long producerID);
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-10-10 15:55:41 UTC (rev 5102)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-10-10 22:56:13 UTC (rev 5103)
@@ -22,20 +22,26 @@
package org.jboss.messaging.core.server.impl;
+import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
+import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendChunkMessage;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.core.server.HandleStatus;
import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.ServerConsumer;
+import org.jboss.messaging.core.server.ServerLargeMessage;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.ServerSession;
import org.jboss.messaging.core.settings.HierarchicalRepository;
@@ -61,6 +67,12 @@
// Static
// ---------------------------------------------------------------------------------------
+ // TODO This is temporary, make this configurable somewhere
+ public static final int BIG_PACKAGE_SIZE = 10 * 1024;
+
+ // TODO This is temporary, make this better
+ public static final int CHUNK_SIZE = 10 * 1024;
+
// Attributes
// -----------------------------------------------------------------------------------
@@ -182,15 +194,23 @@
}
deliveringRefs.add(ref);
-
- SessionReceiveMessage packet = new SessionReceiveMessage(id, ref.getMessage(), ref.getDeliveryCount() + 1);
- channel.send(packet);
+ if (message instanceof ServerLargeMessage)
+ {
+ sendChunks((ServerLargeMessage)message);
+
+ }
+ else
+ {
+ SessionReceiveMessage packet = new SessionReceiveMessage(id, ref.getMessage(), ref.getDeliveryCount() + 1);
+ channel.send(packet);
+ }
+
return HandleStatus.HANDLED;
}
}
-
+
public void close() throws Exception
{
setStarted(false);
@@ -300,6 +320,46 @@
// Private
// --------------------------------------------------------------------------------------
+ /**
+ * @param message
+ * @throws MessagingException
+ */
+ private void sendChunks(ServerLargeMessage message) throws MessagingException
+ {
+ int headerSize = message.getPropertiesEncodeSize();
+
+ final int bodySize = message.getBodySize();
+
+ int bodyLength = BIG_PACKAGE_SIZE - headerSize;
+
+ MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.allocate(message.getPropertiesEncodeSize()));
+ message.encodeProperties(headerBuffer);
+
+ MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate(bodyLength));
+ message.encodeBody(bodyBuffer, 0, bodyLength);
+
+ SessionSendChunkMessage chunk = new SessionSendChunkMessage(id,
+ headerBuffer.array(),
+ bodyBuffer.array(),
+ bodyLength < bodySize,
+ false);
+ channel.send(chunk);
+
+ for (int pos = bodyLength; pos < bodySize;)
+ {
+ bodyLength = Math.min(bodySize - pos, CHUNK_SIZE);
+ bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate(bodyLength));
+
+ message.encodeBody(bodyBuffer, pos, bodyLength);
+
+ chunk = new SessionSendChunkMessage(id, null, bodyBuffer.array(), pos + bodyLength < bodySize, false);
+
+ channel.send(chunk);
+
+ pos += bodyLength;
+ }
+ }
+
private void promptDelivery()
{
session.promptDelivery(messageQueue);
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-10-10 15:55:41 UTC (rev 5102)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-10-10 22:56:13 UTC (rev 5103)
@@ -1172,13 +1172,16 @@
/* (non-Javadoc)
* @see org.jboss.messaging.core.server.ServerSession#createLargeMessage(long, int, byte[])
*/
- public ServerLargeMessage createLargeMessageStorage(long producerID, byte[] header) throws Exception
+ public ServerLargeMessage createLargeMessageStorage(long producerID, long messageID, byte[] header) throws Exception
{
- ServerLargeMessage largeMessage = storageManager.createLargeMessageStorage(storageManager.generateUniqueID());
+ ServerLargeMessage largeMessage = storageManager.createLargeMessageStorage(messageID);
MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.wrap(header));
largeMessage.decodeProperties(headerBuffer);
+
+ // decodeProperties will clean this, as the client didn send the ID originally
+ largeMessage.setMessageID(messageID);
ServerProducer producer = producers.get(producerID);
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2008-10-10 15:55:41 UTC (rev 5102)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2008-10-10 22:56:13 UTC (rev 5103)
@@ -397,8 +397,7 @@
if (message.getHeader() != null)
{
- largeMessage = session.createLargeMessageStorage(message.getTargetID(), message.getHeader());
- largeMessage.setMessageID(message.getMessageID());
+ largeMessage = session.createLargeMessageStorage(message.getTargetID(), message.getMessageID(), message.getHeader());
}
else
{
Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/chunkmessage/MessageChunkTest.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/chunkmessage/MessageChunkTest.java 2008-10-10 15:55:41 UTC (rev 5102)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/chunkmessage/MessageChunkTest.java 2008-10-10 22:56:13 UTC (rev 5103)
@@ -75,8 +75,10 @@
{
body.putInt(i);
}
- body.flip();
+ body.flip();
+ printBuffer("body to be sent : " , body);
+
ClientMessage message = session.createClientMessage(true);
message.setBody(body);
@@ -106,6 +108,8 @@
System.out.println("msg on client = " + message2.getMessageID());
+ printBuffer("message received : ", message2.getBody());
+
assertEqualsByteArrays(body.limit(), body.array(), message2.getBody().array());
session.close();
@@ -117,7 +121,7 @@
protected void setUp() throws Exception
{
- this.realFiles = false;
+ this.realFiles = true;
super.setUp();
}
@@ -125,9 +129,38 @@
{
super.tearDown();
}
-
+
+
// Private -------------------------------------------------------
+
+ public static void printBuffer(String msg, MessagingBuffer buffer)
+ {
+
+ buffer.rewind();
+
+ int size = buffer.limit();
+
+ System.out.print(msg);
+
+
+ for (int i = 0; i < size; i ++)
+ {
+ System.out.print(String.format("%1$X", buffer.getByte()));
+ if (i % 40 != 0 || i == 0)
+ {
+ System.out.print(", ");
+ }
+ else
+ {
+ System.out.println();
+ System.out.print(msg);
+ }
+ }
+ buffer.rewind();
+
+ }
+
// Inner classes -------------------------------------------------
}
More information about the jboss-cvs-commits
mailing list