[jboss-cvs] JBoss Messaging SVN: r5482 - in trunk: src/main/org/jboss/messaging/core/client/impl and 15 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Dec 8 22:53:21 EST 2008


Author: clebert.suconic at jboss.com
Date: 2008-12-08 22:53:21 -0500 (Mon, 08 Dec 2008)
New Revision: 5482

Added:
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionContinuationMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveContinuationMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendContinuationMessage.java
   trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingServiceIntegrationTest.java
Removed:
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendChunkMessage.java
Modified:
   trunk/src/main/org/jboss/messaging/core/client/ClientFileMessage.java
   trunk/src/main/org/jboss/messaging/core/client/ClientMessage.java
   trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientFileMessageImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
   trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
   trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
   trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
   trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeMessageImpl.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java
   trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
   trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java
   trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
   trunk/tests/src/org/jboss/messaging/tests/soak/chunk/MessageChunkSoakTest.java
   trunk/tests/src/org/jboss/messaging/tests/stress/chunk/MessageChunkStressTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
   trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
   trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java
Log:
Chunk & Paging tweaks & PreCommit on Chunk

Modified: trunk/src/main/org/jboss/messaging/core/client/ClientFileMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientFileMessage.java	2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientFileMessage.java	2008-12-09 03:53:21 UTC (rev 5482)
@@ -42,6 +42,8 @@
 
    void setFile(File file);
    
+   void setLargeMessage(boolean largeMessage);
+
    FileChannel getChannel() throws MessagingException;
    
    void closeChannel() throws MessagingException;     

Modified: trunk/src/main/org/jboss/messaging/core/client/ClientMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientMessage.java	2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientMessage.java	2008-12-09 03:53:21 UTC (rev 5482)
@@ -46,6 +46,4 @@
    void acknowledge() throws MessagingException;
    
    boolean isLargeMessage();
-      
-   void setLargeMessage(boolean largeMessage);
 }

Modified: trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java	2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java	2008-12-09 03:53:21 UTC (rev 5482)
@@ -38,13 +38,10 @@
                                boolean xa,
                                boolean autoCommitSends,
                                boolean autoCommitAcks,
-                               final boolean preAcknowledge,
+                               boolean preAcknowledge,
                                int ackBatchSize) throws MessagingException;
 
-   ClientSession createSession(final boolean xa,
-                               final boolean autoCommitSends,
-                               final boolean autoCommitAcks,
-                               final boolean preAcknowledge) throws MessagingException;
+   ClientSession createSession(boolean xa, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge) throws MessagingException;
 
    void setConsumerWindowSize(int size);
 
@@ -64,19 +61,19 @@
 
    int getMinLargeMessageSize();
 
-   void setMinLargeMessageSize(final int minLargeMessageSize);
+   void setMinLargeMessageSize(int minLargeMessageSize);
 
    boolean isBlockOnPersistentSend();
 
-   void setBlockOnPersistentSend(final boolean blocking);
+   void setBlockOnPersistentSend(boolean blocking);
 
    boolean isBlockOnNonPersistentSend();
 
-   void setBlockOnNonPersistentSend(final boolean blocking);
+   void setBlockOnNonPersistentSend(boolean blocking);
 
    boolean isBlockOnAcknowledge();
 
-   void setBlockOnAcknowledge(final boolean blocking);
+   void setBlockOnAcknowledge(boolean blocking);
 
    boolean isAutoGroup();
 
@@ -95,10 +92,10 @@
    long getCallTimeout();
 
    int getMaxConnections();
-   
-//   TransportConfiguration getTransportConfiguration();
-//   
-//   TransportConfiguration getBackupTransportConfiguration();
-   
+
+   // TransportConfiguration getTransportConfiguration();
+   //   
+   // TransportConfiguration getBackupTransportConfiguration();
+
    void close();
 }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-12-09 03:53:21 UTC (rev 5482)
@@ -28,7 +28,7 @@
 import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendChunkMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.util.Future;
 
@@ -183,11 +183,7 @@
             {
                boolean expired = m.isExpired();
 
-               // Chunk messages will execute the flow control while receiving the chunks
-               if (!m.isLargeMessage())
-               {
-                  flowControl(m.getEncodeSize());
-               }
+               flowControlBeforeConsumption(m);
 
                if (expired)
                {
@@ -337,61 +333,60 @@
       }
    }
 
-   public void handleChunk(SessionSendChunkMessage chunk) throws Exception
+
+   public synchronized void handleLargeMessage(final byte[] header) throws Exception
    {
-      if (closed)
+      if (closing)
       {
+         // This is ok - we just ignore the message
          return;
       }
 
-      flowControl(chunk.getBody().length);
+      currentChunkMessage = createFileMessage(header);
+      
+      // We won't call flow control at this point, as we will only flow control the header right before consumption
+      
 
-      if (chunk.getHeader() != null)
+   }
+
+   public synchronized void handleLargeMessageContinuation(SessionReceiveContinuationMessage chunk) throws Exception
+   {
+      if (closing)
       {
-         // The Header only comes on the first message, so a buffer has to be created on the client
-         // to hold either a file or a big message
-         MessagingBuffer header = new ByteBufferWrapper(ByteBuffer.wrap(chunk.getHeader()));
+         return;
+      }
 
-         currentChunkMessage = createFileMessage(header);
+      ByteBuffer body = ByteBuffer.wrap(chunk.getBody());
 
-         if (currentChunkMessage instanceof ClientFileMessage)
-         {
-            ClientFileMessage fileMessage = (ClientFileMessage)currentChunkMessage;
-            addBytesBody(fileMessage, chunk.getBody());
-         }
-         else
-         {
-            MessagingBuffer initialBody = new ByteBufferWrapper(ByteBuffer.wrap(chunk.getBody()));
-            currentChunkMessage.setBody(initialBody);
-         }
+      flowControl(chunk.getBody().length);
+
+      if (isFileConsumer())
+      {
+         ClientFileMessage fileMessage = (ClientFileMessage)currentChunkMessage;
+         addBytesBody(fileMessage, chunk.getBody());
       }
       else
       {
-         // No header.. this is then a continuation of a previous message
-         ByteBuffer body = ByteBuffer.wrap(chunk.getBody());
+         MessagingBuffer currentBody = currentChunkMessage.getBody();
+         
+         final int currentBodySize = currentBody == null ? 0 : currentBody.limit();
 
-         if (currentChunkMessage instanceof ClientFileMessage)
+         MessagingBuffer newBody = new ByteBufferWrapper(ByteBuffer.allocate(currentBodySize + body.limit()));
+
+         if (currentBody != null)
          {
-            ClientFileMessage fileMessage = (ClientFileMessage)currentChunkMessage;
-            addBytesBody(fileMessage, chunk.getBody());
+            newBody.putBytes(currentBody.array());
          }
-         else
-         {
-            MessagingBuffer currentBody = currentChunkMessage.getBody();
+         
+         newBody.putBytes(body.array());
 
-            MessagingBuffer newBody = new ByteBufferWrapper(ByteBuffer.allocate(currentBody.limit() + body.limit()));
-
-            newBody.putBytes(currentBody.array());
-            newBody.putBytes(body.array());
-
-            currentChunkMessage.setBody(newBody);
-         }
+         currentChunkMessage.setBody(newBody);
       }
 
       if (!chunk.isContinues())
       {
          // Close the file that was being generated
-         if (currentChunkMessage instanceof ClientFileMessage)
+         if (isFileConsumer())
          {
             ((ClientFileMessage)currentChunkMessage).closeChannel();
          }
@@ -542,7 +537,7 @@
          {
             boolean expired = message.isExpired();
 
-            flowControl(message.getEncodeSize());
+            flowControlBeforeConsumption(message);
 
             if (!expired)
             {
@@ -558,6 +553,24 @@
       }
    }
 
+   /**
+    * @param message
+    * @throws MessagingException
+    */
+   private void flowControlBeforeConsumption(ClientMessage message) throws MessagingException
+   {
+      // Chunk messages will execute the flow control while receiving the chunks
+      if (!message.isLargeMessage())
+      {
+         flowControl(message.getEncodeSize());
+      }
+      else
+      {
+         // But the header is only flow controlled right before the consumption
+         flowControl(message.getPropertiesEncodeSize());
+      }
+   }
+
    private void doCleanUp(final boolean sendCloseMessage) throws MessagingException
    {
       try
@@ -614,41 +627,52 @@
 
    private ClientFileMessage cloneAsFileMessage(final ClientMessage message) throws Exception
    {
-      int propertiesSize = message.getPropertiesEncodeSize();
-
-      MessagingBuffer bufferProperties = message.getBody().createNewBuffer(propertiesSize);
-
-      // FIXME: Find a better way to clone this ClientMessageImpl as ClientFileMessageImpl without using the
-      // MessagingBuffer.
-      // There is no direct access into the Properties, and I couldn't add a direct cast to this method without loose
-      // abstraction
-      message.encodeProperties(bufferProperties);
-
-      bufferProperties.rewind();
-
-      ClientFileMessageImpl cloneMessage = new ClientFileMessageImpl();
-
-      cloneMessage.decodeProperties(bufferProperties);
-
-      cloneMessage.setDeliveryCount(message.getDeliveryCount());
-
-      cloneMessage.setLargeMessage(message.isLargeMessage());
-
-      cloneMessage.setFile(new File(this.directory, cloneMessage.getMessageID() + "-" +
-                                                    this.session.getName() +
-                                                    "-" +
-                                                    this.getID() +
-                                                    ".jbm"));
-
-      addBytesBody(cloneMessage, message.getBody().array());
-
-      cloneMessage.closeChannel();
-
-      return cloneMessage;
+      if (message instanceof ClientFileMessageImpl)
+      {
+         // nothing to be done
+         return (ClientFileMessage)message;
+      }
+      else
+      {
+         int propertiesSize = message.getPropertiesEncodeSize();
+   
+         MessagingBuffer bufferProperties = message.getBody().createNewBuffer(propertiesSize);
+   
+         // FIXME: Find a better way to clone this ClientMessageImpl as ClientFileMessageImpl without using the
+         // MessagingBuffer.
+         // There is no direct access into the Properties, and I couldn't add a direct cast to this method without loose
+         // abstraction
+         message.encodeProperties(bufferProperties);
+   
+         bufferProperties.rewind();
+   
+         ClientFileMessageImpl cloneMessage = new ClientFileMessageImpl();
+   
+         cloneMessage.decodeProperties(bufferProperties);
+   
+         cloneMessage.setDeliveryCount(message.getDeliveryCount());
+   
+         cloneMessage.setLargeMessage(message.isLargeMessage());
+   
+         cloneMessage.setFile(new File(this.directory, cloneMessage.getMessageID() + "-" +
+                                                       this.session.getName() +
+                                                       "-" +
+                                                       this.getID() +
+                                                       ".jbm"));
+   
+         addBytesBody(cloneMessage, message.getBody().array());
+   
+         cloneMessage.closeChannel();
+   
+         return cloneMessage;
+      }
    }
 
-   private ClientMessage createFileMessage(final MessagingBuffer propertiesBuffer) throws Exception
+   private ClientMessage createFileMessage(final byte[] header) throws Exception
    {
+      
+      MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.wrap(header));
+
       if (isFileConsumer())
       {
          if (!this.directory.exists())
@@ -657,7 +681,7 @@
          }
 
          ClientFileMessageImpl message = new ClientFileMessageImpl();
-         message.decodeProperties(propertiesBuffer);
+         message.decodeProperties(headerBuffer);
          message.setFile(new File(this.directory, message.getMessageID() + "-" +
                                                   this.session.getName() +
                                                   "-" +
@@ -669,7 +693,7 @@
       else
       {
          ClientMessageImpl message = new ClientMessageImpl();
-         message.decodeProperties(propertiesBuffer);
+         message.decodeProperties(headerBuffer);
          message.setLargeMessage(true);
          return message;
       }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java	2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java	2008-12-09 03:53:21 UTC (rev 5482)
@@ -25,7 +25,7 @@
 import org.jboss.messaging.core.client.ClientConsumer;
 import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendChunkMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
 
 /**
  * 
@@ -39,8 +39,10 @@
    long getID();
 
    void handleMessage(ClientMessage message) throws Exception;
+
+   void handleLargeMessage(byte[] largeMessageHeader) throws Exception;
    
-   void handleChunk(SessionSendChunkMessage chunk) throws Exception;
+   void handleLargeMessageContinuation(SessionReceiveContinuationMessage continuation) throws Exception;
 
    void clear();
 
@@ -55,4 +57,5 @@
    void acknowledge(ClientMessage message) throws MessagingException;
    
    void flushAcks() throws MessagingException;
+
 }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientFileMessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientFileMessageImpl.java	2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientFileMessageImpl.java	2008-12-09 03:53:21 UTC (rev 5482)
@@ -113,8 +113,16 @@
    @Override
    public MessagingBuffer getBody()
    {
-      // TODO: Throw an unsuported exception. (Make sure no tests are using this method first)
+      throw new UnsupportedOperationException("getBody is not supported on FileMessages.");
+   }
 
+   /**
+    * If a ClientFileMessage is Smaller then the MinLargeMessage configured on the SessionFactory (or JMSConnectionFactory), it will still be sent as any other message,
+    * and for that the file body (which should be small) will be read from the file an populated on the output buffer
+    *  
+    *  */
+   public void encodeBody(MessagingBuffer buffer)
+   {
       FileChannel channel = null;
       try
       {
@@ -122,12 +130,12 @@
          // for a better performance, users should be using the channels when using file
          channel = newChannel();
 
-         ByteBuffer buffer = ByteBuffer.allocate((int)channel.size());
+         ByteBuffer fileBuffer = ByteBuffer.allocate((int)channel.size());
 
          channel.position(0);
-         channel.read(buffer);
+         channel.read(fileBuffer);
 
-         return new ByteBufferWrapper(buffer);
+         buffer.putBytes(fileBuffer.array(), 0, fileBuffer.limit());
       }
       catch (Exception e)
       {
@@ -146,6 +154,9 @@
       }
    }
 
+   /** 
+    * Read the file content from start to size.
+    */
    @Override
    public synchronized void encodeBody(final MessagingBuffer buffer, final long start, final int size)
    {

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2008-12-09 03:53:21 UTC (rev 5482)
@@ -32,7 +32,7 @@
 import org.jboss.messaging.core.message.impl.MessageImpl;
 import org.jboss.messaging.core.remoting.Channel;
 import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendChunkMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendContinuationMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.util.SimpleString;
@@ -260,39 +260,38 @@
 
       final int bodySize = msg.getBodySize();
 
-      int chunkLength = minLargeMessageSize - headerSize;
+      SessionSendMessage initialChunk = new SessionSendMessage(headerBuffer.array(), false);
 
-      MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate(chunkLength));
+      channel.send(initialChunk);
 
-      msg.encodeBody(bodyBuffer, 0, chunkLength);
-
-      SessionSendChunkMessage chunk = new SessionSendChunkMessage(-1,
-                                                                  headerBuffer.array(),
-                                                                  bodyBuffer.array(),
-                                                                  chunkLength < bodySize,
-                                                                  sendBlocking);
-
-      if (sendBlocking)
+      for (int pos = 0; pos < bodySize;)
       {
-         channel.sendBlocking(chunk);
-      }
-      else
-      {
-         channel.send(chunk);
-      }
-
-      for (int pos = chunkLength; pos < bodySize;)
-      {
-         chunkLength = Math.min(bodySize - pos, minLargeMessageSize);
+         final int chunkLength;
+         final boolean lastChunk;
          
-         bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate(chunkLength));
+         
+         final int bytesToWrite = bodySize - pos;
+         
+         if (bytesToWrite < minLargeMessageSize)
+         {
+            lastChunk = true;
+            chunkLength = bytesToWrite;
+         }
+         else
+         {
+            lastChunk = false;
+            chunkLength = minLargeMessageSize;
+         }
+         
+         final MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate(chunkLength));
 
          msg.encodeBody(bodyBuffer, pos, chunkLength);
 
-         chunk = new SessionSendChunkMessage(-1, null, bodyBuffer.array(), pos + chunkLength < bodySize, sendBlocking);
+         final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(bodyBuffer.array(), !lastChunk, lastChunk && sendBlocking);
 
-         if (sendBlocking)
+         if (sendBlocking && lastChunk)
          {
+            // When sending it blocking, only the last chunk will be blocking.
             channel.sendBlocking(chunk);
          }
          else

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-12-09 03:53:21 UTC (rev 5482)
@@ -61,8 +61,8 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionFailoverCompleteMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendChunkMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAForgetMessage;
@@ -639,14 +639,24 @@
          consumer.handleMessage(message);
       }
    }
+   
+   public void handleReceiveLargeMessage(final long consumerID, final byte[] headerBytes) throws Exception
+   {
+      ClientConsumerInternal consumer = consumers.get(consumerID);
 
-   public void handleReceiveChunk(final long consumerID, final SessionSendChunkMessage chunk) throws Exception
+      if (consumer != null)
+      {
+         consumer.handleLargeMessage(headerBytes);
+      }
+   }
+
+   public void handleReceiveContinuation(final long consumerID, final SessionReceiveContinuationMessage continuation) throws Exception
    {
       ClientConsumerInternal consumer = consumers.get(consumerID);
 
       if (consumer != null)
       {
-         consumer.handleChunk(chunk);
+         consumer.handleLargeMessageContinuation(continuation);
       }
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java	2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java	2008-12-09 03:53:21 UTC (rev 5482)
@@ -16,7 +16,7 @@
 import org.jboss.messaging.core.client.ClientSession;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendChunkMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
 
 /**
  * A ClientSessionInternal
@@ -39,9 +39,11 @@
 
    void removeProducer(ClientProducerInternal producer);
 
+   void handleReceiveLargeMessage(final long consumerID, final byte[] headerBytes) throws Exception;
+
    void handleReceiveMessage(long consumerID, ClientMessage message) throws Exception;
    
-   void handleReceiveChunk(long consumerID, SessionSendChunkMessage chunk) throws Exception;
+   void handleReceiveContinuation(final long consumerID, final SessionReceiveContinuationMessage continuation) throws Exception;
 
    void handleFailover(RemotingConnection backupConnection);
    

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java	2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java	2008-12-09 03:53:21 UTC (rev 5482)
@@ -23,7 +23,7 @@
 package org.jboss.messaging.core.client.impl;
 
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EXCEPTION;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CHUNK_SEND;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_CONTINUATION;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_MSG;
 
 import org.jboss.messaging.core.logging.Logger;
@@ -32,7 +32,7 @@
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendChunkMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
 
 /**
  *
@@ -64,19 +64,26 @@
       {
          switch (type)
          {
-            case SESS_CHUNK_SEND:
+            case SESS_RECEIVE_CONTINUATION:
             {
-               SessionSendChunkMessage chunk = (SessionSendChunkMessage)packet;
-               clientSession.handleReceiveChunk(chunk.getTargetID(), chunk);
+               SessionReceiveContinuationMessage continuation = (SessionReceiveContinuationMessage)packet;
+               clientSession.handleReceiveContinuation(continuation.getConsumerID(), continuation);
 
                break;
             }
             case SESS_RECEIVE_MSG:
             {
                SessionReceiveMessage message = (SessionReceiveMessage) packet;
-      
-               clientSession.handleReceiveMessage(message.getConsumerID(), message.getClientMessage());
                
+               if (message.isLargeMessage())
+               {
+                  clientSession.handleReceiveLargeMessage(message.getConsumerID(), message.getLargeMessageHeader());
+               }
+               else
+               {
+                  clientSession.handleReceiveMessage(message.getConsumerID(), message.getClientMessage());
+               }
+               
                break;
             }
             case EXCEPTION:

Modified: trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java	2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java	2008-12-09 03:53:21 UTC (rev 5482)
@@ -395,6 +395,34 @@
    {
       return this.properties;
    }
+
+   /**
+     * Constructs a <code>String</code> with all attributes
+     * in name = value format.
+     *
+     * @return a <code>String</code> representation 
+     * of this object.
+     */
+    public String toString()
+    {
+        final String TAB = ", ";
+        
+        StringBuffer retValue = new StringBuffer();
+        
+        retValue.append("MessageImpl ( ")
+            .append("messageID = ").append(this.messageID).append(TAB)
+            .append("destination = ").append(this.destination).append(TAB)
+            .append("type = ").append(this.type).append(TAB)
+            .append("durable = ").append(this.durable).append(TAB)
+            .append("expiration = ").append(this.expiration).append(TAB)
+            .append("timestamp = ").append(this.timestamp).append(TAB)
+            .append("properties = ").append(this.properties).append(TAB)
+            .append("priority = ").append(this.priority).append(TAB)
+            .append("body = ").append(this.body).append(TAB)
+            .append(" )");
+        
+        return retValue.toString();
+    }
    
    // Private -------------------------------------------------------
 

Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java	2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java	2008-12-09 03:53:21 UTC (rev 5482)
@@ -159,4 +159,5 @@
     * 
     */
    void startGlobalDepage();
+
 }

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2008-12-09 03:53:21 UTC (rev 5482)
@@ -281,9 +281,8 @@
       {
          store.startDepaging(pagingSPI.getGlobalDepagerExecutor());
       }
-      globalMode.set(false);
    }
-
+   
    /* (non-Javadoc)
     * @see org.jboss.messaging.core.paging.PagingManager#getGlobalSize()
     */

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2008-12-09 03:53:21 UTC (rev 5482)
@@ -27,6 +27,7 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -69,7 +70,7 @@
 
    private final DecimalFormat format = new DecimalFormat("000000000");
 
-   private final AtomicInteger pageUsedSize = new AtomicInteger(0);
+   private final AtomicInteger currentPageSize = new AtomicInteger(0);
 
    private final SimpleString storeName;
 
@@ -90,7 +91,7 @@
    // Bytes consumed by the queue on the memory
    private final AtomicLong sizeInBytes = new AtomicLong();
 
-   private volatile Runnable depageAction;
+   private final AtomicBoolean depaging = new AtomicBoolean(false);
 
    private volatile int numberOfPages;
 
@@ -214,80 +215,6 @@
       return storeName;
    }
 
-   /** 
-    *  It returns a Page out of the Page System without reading it. 
-    *  The method calling this method will remove the page and will start reading it outside of any locks. 
-    *  
-    * */
-   public Page depage() throws Exception
-   {
-      writeLock.lock();
-      currentPageLock.writeLock().lock(); // Make sure no checks are done on currentPage while we are depaging
-
-      try
-      {
-         if (numberOfPages == 0)
-         {
-            return null;
-         }
-         else
-         {
-            numberOfPages--;
-
-            final Page returnPage;
-
-            // We are out of old pages, all that is left now is the current page.
-            // On that case we need to replace it by a new empty page, and return the current page immediately
-            if (currentPageId == firstPageId)
-            {
-               firstPageId = Integer.MAX_VALUE;
-
-               if (currentPage != null)
-               {
-                  returnPage = currentPage;
-                  returnPage.close();
-                  currentPage = null;
-               }
-               else
-               {
-                  // sanity check... it shouldn't happen!
-                  throw new IllegalStateException("CurrentPage is null");
-               }
-
-               // The current page is empty... what means we achieved the end of the pages
-               if (returnPage.getNumberOfMessages() == 0)
-               {
-                  returnPage.open();
-                  returnPage.delete();
-
-                  // This will trigger this Destination to exit the page mode,
-                  // and this will make JBM start using the journal again
-                  return null;
-               }
-               else
-               {
-                  // We need to create a new page, as we can't lock the address until we finish depaging.
-                  openNewPage();
-               }
-
-               return returnPage;
-            }
-            else
-            {
-               returnPage = createPage(firstPageId++);
-            }
-
-            return returnPage;
-         }
-      }
-      finally
-      {
-         currentPageLock.writeLock().unlock();
-         writeLock.unlock();
-      }
-
-   }
-
    public long addSize(final long size) throws Exception
    {
       final long maxSize = getMaxSizeBytes();
@@ -355,15 +282,16 @@
 
             if (isTrace)
             {
-               log.trace("globalMode.get = " + pagingManager.isGlobalPageMode() +
-                         " currentGlobalSize = " +
-                         currentGlobalSize +
-                         " defaultPageSize = " +
-                         pagingManager.getDefaultPageSize() +
-                         " maxGlobalSize = " +
-                         maxGlobalSize +
-                         "maxGlobalSize - defaultPageSize = " +
-                         (maxGlobalSize - pagingManager.getDefaultPageSize()));
+
+               log.trace(" globalDepage = " + pagingManager.isGlobalPageMode() +
+                     "\n currentGlobalSize = " +
+                     currentGlobalSize +
+                     "\n defaultPageSize = " +
+                     pagingManager.getDefaultPageSize() +
+                     "\n maxGlobalSize = " +
+                     maxGlobalSize +
+                     "\n maxGlobalSize - defaultPageSize = " +
+                     (maxGlobalSize - pagingManager.getDefaultPageSize()));
             }
 
             if (pagingManager.isGlobalPageMode() && currentGlobalSize < maxGlobalSize - pagingManager.getDefaultPageSize())
@@ -424,16 +352,16 @@
 
          int bytesToWrite = fileFactory.calculateBlockSize(message.getEncodeSize() + PageImpl.SIZE_RECORD);
 
-         if (pageUsedSize.addAndGet(bytesToWrite) > pageSize && currentPage.getNumberOfMessages() > 0)
+         if (currentPageSize.addAndGet(bytesToWrite) > pageSize && currentPage.getNumberOfMessages() > 0)
          {
-            // Make sure nothing is currently validating currentPaging
+            // Make sure nothing is currently validating or using currentPage
             currentPageLock.writeLock().lock();
             try
             {
                openNewPage();
 
-               // openNewPage will set pageUsedSize to zero, we need to set it again
-               pageUsedSize.addAndGet(bytesToWrite);
+               // openNewPage will set currentPageSize to zero, we need to set it again
+               currentPageSize.addAndGet(bytesToWrite);
             }
             finally
             {
@@ -505,11 +433,14 @@
          }
          else
          {
+            // startDepaging and clearDepage needs to be atomic.
+            // We can't use writeLock to this operation as writeLock would still be used by another thread, and still being a valid usage
             synchronized (this)
             {
-               if (depageAction == null)
+               if (!depaging.get())
                {
-                  depageAction = new DepageRunnable(executor);
+                  depaging.set(true);
+                  Runnable depageAction = new DepageRunnable(executor);
                   executor.execute(depageAction);
                   return true;
                }
@@ -685,6 +616,82 @@
       openNewPage();
    }
 
+   /** 
+    *  It returns a Page out of the Page System without reading it. 
+    *  The method calling this method will remove the page and will start reading it outside of any locks.
+    *   
+    *  Observation: This method is used internally as part of the regular depage process, but externally is used only on tests, 
+    *               and that's why this method is part of the Testable Interface 
+    * */
+   public Page depage() throws Exception
+   {
+      writeLock.lock();
+      currentPageLock.writeLock().lock(); // Make sure no checks are done on currentPage while we are depaging
+
+      try
+      {
+         if (numberOfPages == 0)
+         {
+            return null;
+         }
+         else
+         {
+            numberOfPages--;
+
+            final Page returnPage;
+
+            // We are out of old pages, all that is left now is the current page.
+            // On that case we need to replace it by a new empty page, and return the current page immediately
+            if (currentPageId == firstPageId)
+            {
+               firstPageId = Integer.MAX_VALUE;
+
+               if (currentPage != null)
+               {
+                  returnPage = currentPage;
+                  returnPage.close();
+                  currentPage = null;
+               }
+               else
+               {
+                  // sanity check... it shouldn't happen!
+                  throw new IllegalStateException("CurrentPage is null");
+               }
+
+               // The current page is empty... what means we achieved the end of the pages
+               if (returnPage.getNumberOfMessages() == 0)
+               {
+                  returnPage.open();
+                  returnPage.delete();
+
+                  // This will trigger this Destination to exit the page mode,
+                  // and this will make JBM start using the journal again
+                  return null;
+               }
+               else
+               {
+                  // We need to create a new page, as we can't lock the address until we finish depaging.
+                  openNewPage();
+               }
+
+               return returnPage;
+            }
+            else
+            {
+               returnPage = createPage(firstPageId++);
+            }
+
+            return returnPage;
+         }
+      }
+      finally
+      {
+         currentPageLock.writeLock().unlock();
+         writeLock.unlock();
+      }
+
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -699,7 +706,7 @@
     * If persistent messages are also used, it will update eventual PageTransactions
     */
 
-   private boolean onDepage(final int pageId, final SimpleString destination, final List<PagedMessage> data) throws Exception
+   private void onDepage(final int pageId, final SimpleString destination, final List<PagedMessage> data) throws Exception
    {
       trace("Depaging....");
 
@@ -720,7 +727,7 @@
          if (pageId <= lastPage.getLastId())
          {
             log.warn("Page " + pageId + " was already processed, ignoring the page");
-            return true;
+            return;
          }
       }
 
@@ -804,29 +811,52 @@
          ref.getQueue().addLast(ref);
       }
 
-      if (pagingManager.isGlobalPageMode())
-      {
-         // We use the Default Page Size when in global mode for the calculation of the Watermark
-         return pagingManager.getGlobalSize() < pagingManager.getMaxGlobalSize() - pagingManager.getDefaultPageSize() && getMaxSizeBytes() <= 0 ||
-                getAddressSize() < getMaxSizeBytes();
-      }
-      else
-      {
-         // If Max-size is not configured (-1) it will aways return true, as
-         // this method was probably called by global-depage
-         return getMaxSizeBytes() <= 0 || getAddressSize() < getMaxSizeBytes();
-      }
+   }
 
+   /**
+    * @return
+    */
+   private boolean isFull(final long nextPageSize)
+   {
+      return getMaxSizeBytes() > 0 && getAddressSize() + nextPageSize > getMaxSizeBytes();
    }
 
+   /**
+    * @param nextPageSize
+    * @return
+    */
+   private boolean isGlobalFull(final long nextPageSize)
+   {
+      return pagingManager.getMaxGlobalSize() > 0 && pagingManager.getGlobalSize() + nextPageSize > pagingManager.getMaxGlobalSize();
+   }
+
    private long addAddressSize(final long delta)
    {
       return sizeInBytes.addAndGet(delta);
    }
 
-   private synchronized void clearDequeueThread()
+   /**
+    * startDepaging and clearDepage needs to be atomic.
+    * We can't use writeLock to this operation as writeLock would still be used by another thread, and still being a valid usage
+    * @return true if the depage status was cleared
+    */
+   private synchronized boolean clearDepage()
    {
-      depageAction = null;
+      final boolean pageFull = isFull(getPageSizeBytes());
+      final boolean globalFull = isGlobalFull(getPageSizeBytes());
+      if (pageFull || globalFull)
+      {
+         depaging.set(false);
+         if (!globalFull)
+         {
+            pagingManager.setGlobalPageMode(false);
+         }
+         return true;
+      }
+      else
+      {
+         return false;
+      }
    }
 
    private void openNewPage() throws Exception
@@ -851,7 +881,7 @@
 
          currentPage = createPage(currentPageId);
 
-         pageUsedSize.set(0);
+         currentPageSize.set(0);
 
          currentPage.open();
       }
@@ -912,7 +942,7 @@
     * @return
     * @throws Exception
     */
-   private boolean readPage() throws Exception
+   private void readPage() throws Exception
    {
       Page page = depage();
 
@@ -925,18 +955,16 @@
 
          lastPageRecord = null;
 
-         return false;
+         return;
       }
 
       page.open();
 
       List<PagedMessage> messages = page.read();
 
-      boolean addressNotFull = onDepage(page.getPageId(), storeName, messages);
+      onDepage(page.getPageId(), storeName, messages);
 
       page.delete();
-
-      return addressNotFull;
    }
 
    // Inner classes -------------------------------------------------
@@ -956,14 +984,14 @@
          {
             if (running)
             {
-               boolean needMorePages = readPage();
-               if (needMorePages)
+               if (!isFull(getPageSizeBytes()) && !isGlobalFull(getPageSizeBytes()))
                {
-                  followingExecutor.execute(this);
+                  readPage();
                }
-               else
+               // Note: clearDepage is an atomic operation, it needs to be done even if readPage was not executed because the page was full
+               if (!clearDepage())
                {
-                  clearDequeueThread();
+                  followingExecutor.execute(this);
                }
             }
          }

Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeMessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeMessageImpl.java	2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeMessageImpl.java	2008-12-09 03:53:21 UTC (rev 5482)
@@ -187,7 +187,10 @@
 
    public void deleteFile() throws MessagingException
    {
-      storageManager.deleteFile(file);
+      if (file != null)
+      {
+         storageManager.deleteFile(file);
+      }
    }
 
    @Override

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2008-12-09 03:53:21 UTC (rev 5482)
@@ -502,6 +502,8 @@
 
       storageManager.loadMessages(this, queues, resourceManager);
 
+      // This is necessary as if the server was previously stopped while a depage was being executed,
+      // it needs to resume the depage process on those destinations
       pagingManager.startGlobalDepage();
 
    }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-12-09 03:53:21 UTC (rev 5482)
@@ -28,7 +28,6 @@
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ADD_DESTINATION;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BINDINGQUERY;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BINDINGQUERY_RESP;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CHUNK_SEND;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CLOSE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_COMMIT;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_CLOSE;
@@ -40,11 +39,13 @@
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY_RESP;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_CONTINUATION;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_MSG;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_REMOVE_DESTINATION;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_REPLICATE_DELIVERY;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ROLLBACK;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SEND;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SEND_CONTINUATION;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_START;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_STOP;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_COMMIT;
@@ -113,10 +114,11 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionFailoverCompleteMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionReplicateDeliveryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendChunkMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendContinuationMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
@@ -357,14 +359,14 @@
    {
       return new ArrayList<FailureListener>(failureListeners);
    }
-   
+
    public void setFailureListeners(final List<FailureListener> listeners)
    {
       this.failureListeners.clear();
-      
+
       this.failureListeners.addAll(listeners);
    }
-   
+
    public Object getID()
    {
       return transportConnection.getID();
@@ -390,7 +392,7 @@
    }
 
    public void addFailureListener(final FailureListener listener)
-   {   
+   {
       if (listener == null)
       {
          throw new IllegalStateException("FailureListener cannot be null");
@@ -418,7 +420,7 @@
    {
       return replicatingConnection;
    }
-   
+
    public void setReplicatingConnection(final RemotingConnection connection)
    {
       this.replicatingConnection = connection;
@@ -552,7 +554,7 @@
          try
          {
             boolean callNext = listener.connectionFailed(me);
-            
+
             if (!callNext)
             {
                break;
@@ -839,11 +841,16 @@
             packet = new NullResponseMessage();
             break;
          }
-         case SESS_CHUNK_SEND:
+         case SESS_RECEIVE_CONTINUATION:
          {
-            packet = new SessionSendChunkMessage();
+            packet = new SessionReceiveContinuationMessage();
             break;
          }
+         case SESS_SEND_CONTINUATION:
+         {
+            packet = new SessionSendContinuationMessage();
+            break;
+         }
          case SESS_REPLICATE_DELIVERY:
          {
             packet = new SessionReplicateDeliveryMessage();
@@ -1152,9 +1159,9 @@
             if (replicatingChannel != null)
             {
                DelayedResult result = new DelayedResult();
- 
+
                responseActions.add(result);
-               
+
                responseActionCount++;
 
                replicatingChannel.send(packet);
@@ -1190,7 +1197,7 @@
                   break;
                }
             }
-            
+
             responseActionCount = 0;
          }
       }
@@ -1212,7 +1219,7 @@
       // This will never get called concurrently by more than one thread
 
       private int responseActionCount;
-      
+
       // TODO it's not ideal synchronizing this since it forms a contention point with replication
       // but we need to do this to protect it w.r.t. the check on replicatingChannel
       public void replicateResponseReceived()
@@ -1228,7 +1235,7 @@
                if (result == null)
                {
                   throw new IllegalStateException("Cannot find response action");
-               }                            
+               }
             }
          }
 
@@ -1236,12 +1243,12 @@
          if (result != null)
          {
             result.replicated();
-            
-            //TODO - we can optimise this not to lock every time - only if waiting for all replications to return
+
+            // TODO - we can optimise this not to lock every time - only if waiting for all replications to return
             synchronized (replicationLock)
             {
                responseActionCount--;
-               
+
                if (responseActionCount == 0)
                {
                   replicationLock.notify();
@@ -1249,34 +1256,34 @@
             }
          }
       }
-      
+
       private void waitForAllReplicationResponse()
-      {       
+      {
          synchronized (replicationLock)
          {
             if (replicatingChannel != null)
             {
                long toWait = 10000; // TODO don't hardcode timeout
-               
+
                long start = System.currentTimeMillis();
-               
+
                while (responseActionCount > 0 && toWait > 0)
-               {                       
+               {
                   try
                   {
                      replicationLock.wait();
                   }
                   catch (InterruptedException e)
-                  {                     
+                  {
                   }
-                  
+
                   long now = System.currentTimeMillis();
 
                   toWait -= now - start;
 
-                  start = now;                                                     
+                  start = now;
                }
-               
+
                if (toWait <= 0)
                {
                   log.warn("Timed out waiting for replication responses to return");
@@ -1318,16 +1325,16 @@
       public void transferConnection(final RemotingConnection newConnection)
       {
          // Needs to synchronize on the connection to make sure no packets from
-         // the old connection get processed after transfer has occurred         
+         // the old connection get processed after transfer has occurred
          synchronized (connection.transferLock)
          {
             connection.channels.remove(id);
-            
-            //If we're reconnecting to a live node which is replicated then there will be a replicating channel
-            //too. We need to then make sure that all replication responses come back since packets aren't
-            //considered confirmed until response comes back and is processed. Otherwise responses to previous
-            //message sends could come back after reconnection resulting in clients resending same message
-            //since it wasn't confirmed yet.
+
+            // If we're reconnecting to a live node which is replicated then there will be a replicating channel
+            // too. We need to then make sure that all replication responses come back since packets aren't
+            // considered confirmed until response comes back and is processed. Otherwise responses to previous
+            // message sends could come back after reconnection resulting in clients resending same message
+            // since it wasn't confirmed yet.
             waitForAllReplicationResponse();
 
             // And switch it
@@ -1342,7 +1349,7 @@
             rnewConnection.channels.put(id, this);
 
             connection = rnewConnection;
-         }         
+         }
       }
 
       public void replayCommands(final int otherLastReceivedCommandID)
@@ -1350,7 +1357,7 @@
          clearUpTo(otherLastReceivedCommandID);
 
          for (final Packet packet : resendCache)
-         {            
+         {
             doWrite(packet);
          }
       }
@@ -1621,7 +1628,7 @@
                channel.replicatingChannelDead();
             }
          }
-         
+
          return true;
       }
    }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2008-12-09 03:53:21 UTC (rev 5482)
@@ -137,15 +137,17 @@
 
    public static final byte SESS_SEND = 76;
 
-   public static final byte SESS_CONSUMER_CLOSE = 77;
+   public static final byte SESS_SEND_CONTINUATION = 77;
 
+   public static final byte SESS_CONSUMER_CLOSE = 78;
+
    public static final byte SESS_RECEIVE_MSG = 79;
 
-   public static final byte SESS_FAILOVER_COMPLETE = 80;
+   public static final byte SESS_RECEIVE_CONTINUATION = 80;
 
-   public static final byte SESS_REPLICATE_DELIVERY = 81;
+   public static final byte SESS_FAILOVER_COMPLETE = 81;
 
-   public static final byte SESS_CHUNK_SEND = 95;
+   public static final byte SESS_REPLICATE_DELIVERY = 82;
 
    // Static --------------------------------------------------------
 

Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionContinuationMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionContinuationMessage.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionContinuationMessage.java	2008-12-09 03:53:21 UTC (rev 5482)
@@ -0,0 +1,117 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.remoting.impl.wireformat;
+
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.util.DataConstants;
+
+/**
+ * A SessionContinuationMessage
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * 
+ * Created Dec 5, 2008 10:08:40 AM
+ *
+ *
+ */
+public abstract class SessionContinuationMessage extends PacketImpl
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private byte[] body;
+
+   private boolean continues;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public SessionContinuationMessage(byte type,
+                                     final byte[] body,
+                                     final boolean continues)
+   {
+      super(type);
+      this.body = body;
+      this.continues = continues;
+   }
+
+   public SessionContinuationMessage(byte type)
+   {
+      super(type);
+   }
+
+   // Public --------------------------------------------------------
+
+   /**
+    * @return the body
+    */
+   public byte[] getBody()
+   {
+      return body;
+   }
+
+   /**
+    * @return the continues
+    */
+   public boolean isContinues()
+   {
+      return continues;
+   }
+
+   @Override
+   public int getRequiredBufferSize()
+   {
+      return BASIC_PACKET_SIZE + DataConstants.SIZE_INT +
+             body.length +
+             DataConstants.SIZE_BOOLEAN;
+   }
+
+   @Override
+   public void encodeBody(final MessagingBuffer buffer)
+   {
+      buffer.putInt(body.length);
+      buffer.putBytes(body);
+      buffer.putBoolean(continues);
+   }
+
+   @Override
+   public void decodeBody(final MessagingBuffer buffer)
+   {
+      int size = buffer.getInt();
+      body = new byte[size];
+      buffer.getBytes(body);
+      continues = buffer.getBoolean();
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}


Property changes on: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionContinuationMessage.java
___________________________________________________________________
Name: svn:mergeinfo
   + 

Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveContinuationMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveContinuationMessage.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveContinuationMessage.java	2008-12-09 03:53:21 UTC (rev 5482)
@@ -0,0 +1,111 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.remoting.impl.wireformat;
+
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.util.DataConstants;
+
+/**
+ * A SessionSendContinuationMessage
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * 
+ * Created Dec 4, 2008 12:25:14 PM
+ *
+ *
+ */
+public class SessionReceiveContinuationMessage extends SessionContinuationMessage
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private long consumerID;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   /**
+    * @param type
+    */
+   public SessionReceiveContinuationMessage()
+   {
+      super(SESS_RECEIVE_CONTINUATION);
+   }
+
+   /**
+    * @param type
+    * @param body
+    * @param continues
+    * @param requiresResponse
+    */
+   public SessionReceiveContinuationMessage(final long consumerID,
+                                            final byte[] body,
+                                            final boolean continues,
+                                            final boolean requiresResponse)
+   {
+      super(SESS_RECEIVE_CONTINUATION, body, continues);
+      this.consumerID = consumerID;
+   }
+
+   /**
+    * @return the consumerID
+    */
+   public long getConsumerID()
+   {
+      return consumerID;
+   }
+
+   // Public --------------------------------------------------------
+
+   @Override
+   public int getRequiredBufferSize()
+   {
+      return super.getRequiredBufferSize() + DataConstants.SIZE_LONG;
+   }
+
+   @Override
+   public void encodeBody(final MessagingBuffer buffer)
+   {
+      super.encodeBody(buffer);
+      buffer.putLong(consumerID);
+   }
+
+   @Override
+   public void decodeBody(final MessagingBuffer buffer)
+   {
+      super.decodeBody(buffer);
+      consumerID = buffer.getLong();
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveMessage.java	2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveMessage.java	2008-12-09 03:53:21 UTC (rev 5482)
@@ -46,6 +46,10 @@
 
    private long consumerID;
 
+   private boolean largeMessage;
+
+   private byte[] largeMessageHeader;
+
    private ClientMessage clientMessage;
 
    private ServerMessage serverMessage;
@@ -56,6 +60,19 @@
 
    // Constructors --------------------------------------------------
 
+   public SessionReceiveMessage(final long consumerID, final byte[] largeMessageHeader, final int deliveryCount)
+   {
+      super(SESS_RECEIVE_MSG);
+
+      this.consumerID = consumerID;
+
+      this.largeMessageHeader = largeMessageHeader;
+
+      this.deliveryCount = deliveryCount;
+
+      this.largeMessage = true;
+   }
+
    public SessionReceiveMessage(final long consumerID, final ServerMessage message, final int deliveryCount)
    {
       super(SESS_RECEIVE_MSG);
@@ -67,6 +84,8 @@
       this.clientMessage = null;
 
       this.deliveryCount = deliveryCount;
+
+      this.largeMessage = false;
    }
 
    public SessionReceiveMessage()
@@ -91,22 +110,57 @@
       return serverMessage;
    }
 
+   public byte[] getLargeMessageHeader()
+   {
+      return largeMessageHeader;
+   }
+   
+   /**
+    * @return the largeMessage
+    */
+   public boolean isLargeMessage()
+   {
+      return largeMessage;
+   }
+
    public int getDeliveryCount()
    {
       return deliveryCount;
    }
 
-   
    public int getRequiredBufferSize()
    {
-      return BASIC_PACKET_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_INT + serverMessage.getEncodeSize();
+      if (largeMessage)
+      {
+         return BASIC_PACKET_SIZE + DataConstants.SIZE_LONG +
+                DataConstants.SIZE_INT +
+                DataConstants.SIZE_BOOLEAN +
+                DataConstants.SIZE_INT +
+                largeMessageHeader.length;
+      }
+      else
+      {
+         return BASIC_PACKET_SIZE + DataConstants.SIZE_LONG +
+                DataConstants.SIZE_INT +
+                DataConstants.SIZE_BOOLEAN +
+                serverMessage.getEncodeSize();
+      }
    }
-   
+
    public void encodeBody(final MessagingBuffer buffer)
    {
       buffer.putLong(consumerID);
       buffer.putInt(deliveryCount);
-      serverMessage.encode(buffer);
+      buffer.putBoolean(largeMessage);
+      if (largeMessage)
+      {
+         buffer.putInt(largeMessageHeader.length);
+         buffer.putBytes(largeMessageHeader);
+      }
+      else
+      {
+         serverMessage.encode(buffer);
+      }
    }
 
    public void decodeBody(final MessagingBuffer buffer)
@@ -117,11 +171,20 @@
 
       deliveryCount = buffer.getInt();
 
-      clientMessage = new ClientMessageImpl(deliveryCount);
+      largeMessage = buffer.getBoolean();
 
-      clientMessage.decode(buffer);
-
-      clientMessage.getBody().flip();
+      if (largeMessage)
+      {
+         int size = buffer.getInt();
+         largeMessageHeader = new byte[size];
+         buffer.getBytes(largeMessageHeader);
+      }
+      else
+      {
+         clientMessage = new ClientMessageImpl(deliveryCount);
+         clientMessage.decode(buffer);
+         clientMessage.getBody().flip();
+      }
    }
 
    // Package protected ---------------------------------------------

Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendChunkMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendChunkMessage.java	2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendChunkMessage.java	2008-12-09 03:53:21 UTC (rev 5482)
@@ -1,197 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.remoting.impl.wireformat;
-
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.util.DataConstants;
-
-/**
- * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
- * 
- * @version <tt>$Revision$</tt>
- */
-public class SessionSendChunkMessage extends PacketImpl
-{
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   private long targetID;
-
-   private byte[] header;
-
-   private byte[] body;
-
-   private boolean continues;
-
-   private long messageID;
-
-   private boolean requiresResponse;
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   public SessionSendChunkMessage(final long targetID,
-                                  final byte[] header,
-                                  final byte[] body,
-                                  final boolean continues,
-                                  final boolean requiresResponse)
-   {
-      super(SESS_CHUNK_SEND);
-      this.targetID = targetID;
-      this.header = header;
-      this.body = body;
-      this.continues = continues;
-      this.requiresResponse = requiresResponse;
-   }
-
-   public SessionSendChunkMessage()
-   {
-      super(SESS_CHUNK_SEND);
-   }
-
-   // Public --------------------------------------------------------
-
-   public long getTargetID()
-   {
-      return targetID;
-   }
-
-   public boolean isRequiresResponse()
-   {
-      return requiresResponse;
-   }
-
-   public byte[] getHeader()
-   {
-      return header;
-   }
-
-   public byte[] getBody()
-   {
-      return body;
-   }
-
-   public long getMessageID()
-   {
-      return messageID;
-   }
-
-   public void setMessageID(final long messageId)
-   {
-      messageID = messageId;
-   }
-
-   public boolean isContinues()
-   {
-      return continues;
-   }
-
-   @Override
-   public int getRequiredBufferSize()
-   {
-      return DEFAULT_PACKET_SIZE + DataConstants.SIZE_LONG /* TargetID */+
-             DataConstants.SIZE_INT /* HeaderLength */+
-             (header != null ? header.length : 0) /* Header bytes */+
-             DataConstants.SIZE_INT /* BodyLength */+
-             body.length /* Body bytes */+
-             DataConstants.SIZE_BOOLEAN /* hasContinuations */+
-             DataConstants.SIZE_BOOLEAN /* requiresResponse */+
-             DataConstants.SIZE_BOOLEAN /* has MessageId */+
-             (messageID > 0 ? DataConstants.SIZE_LONG : 0);
-   }
-
-   @Override
-   public void encodeBody(final MessagingBuffer buffer)
-   {
-      buffer.putLong(targetID);
-
-      if (header != null)
-      {
-         buffer.putInt(header.length);
-         buffer.putBytes(header);
-      }
-      else
-      {
-         buffer.putInt(0);
-      }
-
-      buffer.putInt(body.length);
-      buffer.putBytes(body);
-
-      buffer.putBoolean(continues);
-
-      buffer.putBoolean(requiresResponse);
-
-      buffer.putBoolean(messageID > 0);
-
-      if (messageID > 0)
-      {
-         buffer.putLong(messageID);
-      }
-      
-   }
-
-   @Override
-   public void decodeBody(final MessagingBuffer buffer)
-   {
-      targetID = buffer.getLong();
-
-      final int headerLength = buffer.getInt();
-
-      if (headerLength > 0)
-      {
-         header = new byte[headerLength];
-         buffer.getBytes(header);
-      }
-      else
-      {
-         header = null;
-      }
-
-      final int bodyLength = buffer.getInt();
-
-      body = new byte[bodyLength];
-      buffer.getBytes(body);
-
-      continues = buffer.getBoolean();
-
-      requiresResponse = buffer.getBoolean();
-
-      final boolean hasMessageID = buffer.getBoolean();
-
-      if (hasMessageID)
-      {
-         messageID = buffer.getLong();
-      }
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-}

Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendContinuationMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendContinuationMessage.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendContinuationMessage.java	2008-12-09 03:53:21 UTC (rev 5482)
@@ -0,0 +1,113 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.remoting.impl.wireformat;
+
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.util.DataConstants;
+
+
+/**
+ * A SessionSendContinuationMessage
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * 
+ * Created Dec 4, 2008 12:25:14 PM
+ *
+ *
+ */
+public class SessionSendContinuationMessage extends SessionContinuationMessage
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private boolean requiresResponse;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   /**
+    * @param type
+    */
+   public SessionSendContinuationMessage()
+   {
+      super(SESS_SEND_CONTINUATION);
+   }
+
+   /**
+    * @param type
+    * @param body
+    * @param continues
+    * @param requiresResponse
+    */
+   public SessionSendContinuationMessage(final byte[] body,
+                                         final boolean continues,
+                                         final boolean requiresResponse)
+   {
+      super(SESS_SEND_CONTINUATION, body, continues);
+      this.requiresResponse = requiresResponse;
+   }
+
+
+   // Public --------------------------------------------------------
+   
+   /**
+    * @return the requiresResponse
+    */
+   public boolean isRequiresResponse()
+   {
+      return requiresResponse;
+   }
+
+   @Override
+   public int getRequiredBufferSize()
+   {
+      return super.getRequiredBufferSize() + DataConstants.SIZE_BOOLEAN;
+   }
+
+   @Override
+   public void encodeBody(final MessagingBuffer buffer)
+   {
+      super.encodeBody(buffer);
+      buffer.putBoolean(requiresResponse);
+   }
+
+   @Override
+   public void decodeBody(final MessagingBuffer buffer)
+   {
+      super.decodeBody(buffer);
+      requiresResponse = buffer.getBoolean();
+   }
+
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java	2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java	2008-12-09 03:53:21 UTC (rev 5482)
@@ -18,11 +18,10 @@
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
+ */
 
 package org.jboss.messaging.core.remoting.impl.wireformat;
 
-import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.message.Message;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.core.server.ServerMessage;
@@ -32,6 +31,7 @@
 /**
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:csuconic at redhat.com">Clebert Suconic</a>
  * 
  * @version <tt>$Revision$</tt>
  */
@@ -39,14 +39,20 @@
 {
    // Constants -----------------------------------------------------
 
-   private static final Logger log = Logger.getLogger(SessionSendMessage.class);
-   
    // Attributes ----------------------------------------------------
 
    private Message clientMessage;
+
+   private boolean largeMessage;
+
+   /** Used only if largeMessage */
+   private byte[] largeMessageHeader;
    
+   /** We need to set the MessageID when replicating this on the server */
+   private long largeMessageId = -1;
+
    private ServerMessage serverMessage;
-   
+
    private boolean requiresResponse;
 
    // Static --------------------------------------------------------
@@ -57,74 +63,188 @@
    {
       super(SESS_SEND);
 
-      this.clientMessage = message;
-      
+      clientMessage = message;
+
       this.requiresResponse = requiresResponse;
+
+      largeMessage = false;
    }
-      
+
+   public SessionSendMessage(final byte[] largeMessageHeader, final boolean requiresResponse)
+   {
+      super(SESS_SEND);
+
+      this.largeMessageHeader = largeMessageHeader;
+
+      this.requiresResponse = requiresResponse;
+
+      largeMessage = true;
+   }
+
    public SessionSendMessage()
    {
       super(SESS_SEND);
    }
-   
+
    // Public --------------------------------------------------------
 
+   
+   public boolean isLargeMessage()
+   {
+      return largeMessage;
+   }
+   
    public Message getClientMessage()
    {
       return clientMessage;
    }
-   
+
    public ServerMessage getServerMessage()
    {
       return serverMessage;
    }
-   
+
+   public byte[] getLargeMessageHeader()
+   {
+      return largeMessageHeader;
+   }
+
    public boolean isRequiresResponse()
    {
       return requiresResponse;
    }
    
+   /**
+    * @return the largeMessageId
+    */
+   public long getMessageID()
+   {
+      if (largeMessage)
+      {
+         return largeMessageId;
+      }
+      else
+      {
+         return serverMessage.getMessageID();
+      }
+   }
+
+   /**
+    * @param largeMessageId the largeMessageId to set
+    */
+   public void setMessageID(long id)
+   {
+      if (largeMessage)
+      {
+         this.largeMessageId = id;
+      }
+      else
+      {
+         serverMessage.setMessageID(id);
+      }
+   }
+
+   @Override
    public void encodeBody(final MessagingBuffer buffer)
    {
-      if (clientMessage != null)
+      buffer.putBoolean(largeMessage);
+
+      if (largeMessage)
       {
+         buffer.putInt(largeMessageHeader.length);
+         buffer.putBytes(largeMessageHeader);
+         
+         if (largeMessageId > 0)
+         {
+            buffer.putBoolean(true);
+            buffer.putLong(largeMessageId);
+         }
+         else
+         {
+            buffer.putBoolean(false);
+         }
+      }
+      else if (clientMessage != null)
+      {
          clientMessage.encode(buffer);
       }
       else
       {
-         //If we're replicating a buffer to a backup node then we encode the serverMessage not the clientMessage
+         // If we're replicating a buffer to a backup node then we encode the serverMessage not the clientMessage
          serverMessage.encode(buffer);
       }
-      
+
       buffer.putBoolean(requiresResponse);
    }
-   
+
+   @Override
    public void decodeBody(final MessagingBuffer buffer)
    {
-      //TODO can be optimised
-      
-      serverMessage = new ServerMessageImpl();
-      
-      serverMessage.decode(buffer);
-      
-      serverMessage.getBody().flip();
-      
-      requiresResponse = buffer.getBoolean();
+      largeMessage = buffer.getBoolean();
+
+      if (largeMessage)
+      {
+         int largeMessageLength = buffer.getInt();
+
+         largeMessageHeader = new byte[largeMessageLength];
+
+         buffer.getBytes(largeMessageHeader);
+         
+         final boolean largeMessageIDFilled = buffer.getBoolean();
+         
+         if (largeMessageIDFilled)
+         {
+            this.largeMessageId = buffer.getLong();
+         }
+         else
+         {
+            this.largeMessageId = -1;
+         }
+      }
+      else
+      {
+         // TODO can be optimised
+
+         serverMessage = new ServerMessageImpl();
+
+         serverMessage.decode(buffer);
+
+         serverMessage.getBody().flip();
+
+         requiresResponse = buffer.getBoolean();
+      }
    }
 
+   @Override
    public int getRequiredBufferSize()
    {
-      if (clientMessage != null)
+      if (largeMessage)
       {
-         return BASIC_PACKET_SIZE + clientMessage.getEncodeSize() + DataConstants.SIZE_BOOLEAN;
+         return BASIC_PACKET_SIZE +
+                // IsLargeMessage
+                DataConstants.SIZE_BOOLEAN +
+                // BufferSize
+                DataConstants.SIZE_INT +
+                // Bytes sent
+                largeMessageHeader.length +
+                // LargeMessageID (if > 0) and a boolean statying if the largeMessageID is set
+                DataConstants.SIZE_BOOLEAN + (largeMessageId >= 0 ? DataConstants.SIZE_LONG : 0) + 
+                DataConstants.SIZE_BOOLEAN;
       }
+      else if (clientMessage != null)
+      {
+         return DataConstants.SIZE_BOOLEAN + BASIC_PACKET_SIZE +
+                clientMessage.getEncodeSize() +
+                DataConstants.SIZE_BOOLEAN;
+      }
       else
       {
-         return BASIC_PACKET_SIZE + serverMessage.getEncodeSize() + DataConstants.SIZE_BOOLEAN;
+         return DataConstants.SIZE_BOOLEAN + BASIC_PACKET_SIZE +
+                serverMessage.getEncodeSize() +
+                DataConstants.SIZE_BOOLEAN;
       }
    }
 
-
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerSession.java	2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerSession.java	2008-12-09 03:53:21 UTC (rev 5482)
@@ -37,7 +37,7 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionReplicateDeliveryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendChunkMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendContinuationMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
@@ -66,7 +66,7 @@
    String getUsername();
 
    String getPassword();
-   
+
    int getMinLargeMessageSize();
 
    Object getConnectionID();
@@ -74,11 +74,11 @@
    void removeConsumer(ServerConsumer consumer) throws Exception;
 
    void close() throws Exception;
-   
+
    void promptDelivery(Queue queue);
 
    void handleAcknowledge(final SessionAcknowledgeMessage packet);
-   
+
    void handleExpired(final SessionExpiredMessage packet);
 
    void handleRollback(Packet packet);
@@ -110,15 +110,15 @@
    void handleSetXATimeout(SessionXASetTimeoutMessage packet);
 
    void handleAddDestination(SessionAddDestinationMessage packet);
-   
+
    void handleStart(Packet packet);
-   
+
    void handleStop(Packet packet);
 
    void handleRemoveDestination(SessionRemoveDestinationMessage packet);
 
    void handleCreateQueue(SessionCreateQueueMessage packet);
-  
+
    void handleDeleteQueue(SessionDeleteQueueMessage packet);
 
    void handleCreateConsumer(SessionCreateConsumerMessage packet);
@@ -131,18 +131,20 @@
 
    void handleReceiveConsumerCredits(SessionConsumerFlowCreditMessage packet);
 
-   public void handleSendChunkMessage(SessionSendChunkMessage packet);
+   void handleSendContinuations(SessionSendContinuationMessage packet);
 
    void handleSend(SessionSendMessage packet);
 
+   void handleSendLargeMessage(SessionSendMessage packet);
+
    void handleFailedOver(Packet packet);
-   
+
    void handleClose(Packet packet);
-   
+
    void handleReplicatedDelivery(SessionReplicateDeliveryMessage packet);
-   
+
    int transferConnection(RemotingConnection newConnection, int lastReceivedCommandID);
-   
+
    Channel getChannel();
 
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-12-09 03:53:21 UTC (rev 5482)
@@ -43,7 +43,7 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionReplicateDeliveryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendChunkMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.core.server.HandleStatus;
 import org.jboss.messaging.core.server.MessageReference;
@@ -180,15 +180,15 @@
       // Otherwise we could end up with a situation where a close comes in, then a delivery comes in,
       // then close gets replicated to backup, then delivery gets replicated, but consumer is already
       // closed!
-//      lock.lock();
-//      try
-//      {
-         setStarted(false);
-//      }
-//      finally
-//      {
-//         lock.unlock();
-//      }
+      // lock.lock();
+      // try
+      // {
+      setStarted(false);
+      // }
+      // finally
+      // {
+      // lock.unlock();
+      // }
 
       DelayedResult result = channel.replicatePacket(packet);
 
@@ -239,15 +239,15 @@
 
    public void close() throws Exception
    {
-//      lock.lock();
-//      try
-//      {
-         setStarted(false);
-//      }
-//      finally
-//      {
-//         lock.unlock();
-//      }
+      // lock.lock();
+      // try
+      // {
+      setStarted(false);
+      // }
+      // finally
+      // {
+      // lock.unlock();
+      // }
 
       doClose();
    }
@@ -545,24 +545,26 @@
          {
             deliveringRefs.add(ref);
          }
-         
+
+         // TODO: get rid of the instanceof by something like message.isLargeMessage()
          if (message instanceof ServerLargeMessage)
          {
             // TODO: How to inform the backup node about the LargeMessage being sent?
-            largeMessageSender = new LargeMessageSender((ServerLargeMessage)message);
+            largeMessageSender = new LargeMessageSender((ServerLargeMessage)message, ref);
 
             largeMessageSender.sendLargeMessage();
          }
          else
          {
             sendStandardMessage(ref, message);
-         }
 
-         if (preAcknowledge)
-         {
-            doAck(ref);
+            if (preAcknowledge)
+            {
+               doAck(ref);
+            }
          }
 
+
          return HandleStatus.HANDLED;
       }
       finally
@@ -589,7 +591,7 @@
       if (result == null)
       {
          // Not replicated - just send now
-         
+
          channel.send(packet);
       }
       else
@@ -598,7 +600,7 @@
          result.setResultRunner(new Runnable()
          {
             public void run()
-            {               
+            {
                channel.send(packet);
             }
          });
@@ -617,16 +619,22 @@
       /** The current message being processed */
       private ServerLargeMessage pendingLargeMessage;
 
+      private final MessageReference ref;
+
+      private boolean sentFirstMessage = false;
+
       /** The current position on the message being processed */
       private long positionPendingLargeMessage;
 
-      private SessionSendChunkMessage readAheadChunk;
+      private SessionReceiveContinuationMessage readAheadChunk;
 
-      public LargeMessageSender(final ServerLargeMessage message)
+      public LargeMessageSender(final ServerLargeMessage message, final MessageReference ref)
       {
          pendingLargeMessage = (ServerLargeMessage)message;
 
          sizePendingLargeMessage = pendingLargeMessage.getBodySize();
+
+         this.ref = ref;
       }
 
       public boolean sendLargeMessage()
@@ -645,20 +653,42 @@
                return false;
             }
 
+            if (!sentFirstMessage)
+            {
+
+               sentFirstMessage = true;
+
+               MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.allocate(pendingLargeMessage.getPropertiesEncodeSize()));
+               pendingLargeMessage.encodeProperties(headerBuffer);
+
+               SessionReceiveMessage initialMessage = new SessionReceiveMessage(id,
+                                                                                headerBuffer.array(),
+                                                                                ref.getDeliveryCount() + 1);
+
+               channel.send(initialMessage);
+
+               if (availableCredits != null)
+               {
+                  // RequiredBufferSize on this case represents the right number of bytes sent
+                  availableCredits.addAndGet(-pendingLargeMessage.getPropertiesEncodeSize());
+               }
+            }
+
             if (readAheadChunk != null)
             {
                int chunkLen = readAheadChunk.getBody().length;
-               
+
                positionPendingLargeMessage += chunkLen;
-               
-               channel.send(readAheadChunk);
-               
-               readAheadChunk = null;
-               
+
                if (availableCredits != null)
                {
                   availableCredits.addAndGet(-chunkLen);
                }
+
+               channel.send(readAheadChunk);
+
+               readAheadChunk = null;
+
             }
 
             while (positionPendingLargeMessage < sizePendingLargeMessage)
@@ -672,7 +702,7 @@
                   return false;
                }
 
-               SessionSendChunkMessage chunk = createChunkSend();
+               SessionReceiveContinuationMessage chunk = createChunkSend();
 
                int chunkLen = chunk.getBody().length;
 
@@ -689,7 +719,20 @@
             pendingLargeMessage.releaseResources();
 
             ServerConsumerImpl.this.largeMessageSender = null;
+            
+            if (preAcknowledge)
+            {
+               try
+               {
+                  doAck(ref);
+               }
+               catch (Exception e)
+               {
+                  log.warn("Error while ACKing reference " + ref, e);
+               }
+            }
 
+
             return true;
          }
          finally
@@ -698,45 +741,20 @@
          }
       }
 
-      private SessionSendChunkMessage createChunkSend()
+      private SessionReceiveContinuationMessage createChunkSend()
       {
-         SessionSendChunkMessage chunk;
+         SessionReceiveContinuationMessage chunk;
 
          int localChunkLen = 0;
 
-         if (positionPendingLargeMessage == 0)
-         {
-            int headerSize = pendingLargeMessage.getPropertiesEncodeSize();
+         localChunkLen = (int)Math.min(sizePendingLargeMessage - positionPendingLargeMessage, minLargeMessageSize);
 
-            localChunkLen = minLargeMessageSize - headerSize;
+         MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate((int)localChunkLen));
 
-            MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.allocate(pendingLargeMessage.getPropertiesEncodeSize()));
-            pendingLargeMessage.encodeProperties(headerBuffer);
+         pendingLargeMessage.encodeBody(bodyBuffer, positionPendingLargeMessage, localChunkLen);
 
-            MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate((int)localChunkLen));
-            pendingLargeMessage.encodeBody(bodyBuffer, 0, localChunkLen);
+         chunk = new SessionReceiveContinuationMessage(id, bodyBuffer.array(), positionPendingLargeMessage + localChunkLen < sizePendingLargeMessage, false); 
 
-            chunk = new SessionSendChunkMessage(id,
-                                                headerBuffer.array(),
-                                                bodyBuffer.array(),
-                                                localChunkLen < sizePendingLargeMessage,
-                                                false);
-         }
-         else
-         {
-            localChunkLen = (int)Math.min(sizePendingLargeMessage - positionPendingLargeMessage, minLargeMessageSize);
-
-            MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate((int)localChunkLen));
-
-            pendingLargeMessage.encodeBody(bodyBuffer, positionPendingLargeMessage, localChunkLen);
-
-            chunk = new SessionSendChunkMessage(id,
-                                                null,
-                                                bodyBuffer.array(),
-                                                positionPendingLargeMessage + localChunkLen < sizePendingLargeMessage,
-                                                false);
-         }
-
          return chunk;
       }
    }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-12-09 03:53:21 UTC (rev 5482)
@@ -61,7 +61,7 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionReplicateDeliveryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendChunkMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendContinuationMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
@@ -1956,7 +1956,7 @@
          result = channel.replicatePacket(packet);
 
          // note we process start before response is back from the backup
-         
+
          setStarted(true);
       }
       finally
@@ -2157,68 +2157,40 @@
       }
    }
 
-   public void handleSendChunkMessage(final SessionSendChunkMessage packet)
+   public void handleSendLargeMessage(final SessionSendMessage packet)
    {
-      if (packet.getMessageID() == 0)
-      {
-         packet.setMessageID(storageManager.generateUniqueID());
-      }
 
-      Packet response = null;
+      DelayedResult result = channel.replicatePacket(packet);
 
-      // TODO: Replication on ChunkMessages
-
-      try
+      if (packet.getMessageID() <= 0L)
       {
-         if (packet.getHeader() != null)
-         {
-            largeMessage = createLargeMessageStorage(packet.getMessageID(), packet.getHeader());
-         }
+         // must generate message id here, so we know they are in sync on live and backup
+         long id = storageManager.generateUniqueID();
 
-         largeMessage.addBytes(packet.getBody());
+         packet.setMessageID(id);
+      }
 
-         if (!packet.isContinues())
-         {
-            final ServerLargeMessage message = largeMessage;
 
-            largeMessage = null;
+      // With a send we must make sure it is replicated to backup before being processed on live
+      // or can end up with delivery being processed on backup before original send
 
-            message.complete();
-
-            send(message);
-         }
-
-         if (packet.isRequiresResponse())
-         {
-            response = new NullResponseMessage();
-         }
+      if (result == null)
+      {
+         doSendLargeMessage(packet);
       }
-      catch (Exception e)
+      else
       {
-         log.error("Failed to send message", e);
-
-         if (packet.isRequiresResponse())
+         result.setResultRunner(new Runnable()
          {
-            if (e instanceof MessagingException)
+            public void run()
             {
-               response = new MessagingExceptionMessage((MessagingException)e);
+               doSendLargeMessage(packet);
             }
-            else
-            {
-               response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
-            }
-         }
+         });
       }
-
-      channel.confirm(packet);
-
-      if (response != null)
-      {
-         channel.send(response);
-      }
-
+      
    }
-
+   
    public void handleSend(final SessionSendMessage packet)
    {
       // With a send we must make sure it is replicated to backup before being processed on live
@@ -2239,14 +2211,14 @@
          lock = null;
       }
 
-      if (msg.getMessageID() == 0L)
+      if (packet.getMessageID() <= 0L)
       {
          // must generate message id here, so we know they are in sync on live and backup
          long id = storageManager.generateUniqueID();
 
-         msg.setMessageID(id);
+         packet.setMessageID(id);
       }
-      
+
       if (channel.getReplicatingChannel() != null)
       {
          msg.putBooleanProperty(new SimpleString("clustered"), true);
@@ -2274,70 +2246,34 @@
          });
       }
    }
-
-   private void doSend(final SessionSendMessage packet)
+   
+   public void handleSendContinuations(final SessionSendContinuationMessage packet)
    {
-      Packet response = null;
 
-      try
-      {
-         ServerMessage message = packet.getServerMessage();
+      DelayedResult result = channel.replicatePacket(packet);
 
-         if (message.getDestination().equals(managementAddress))
-         {
-            // It's a management message
+      // With a send we must make sure it is replicated to backup before being processed on live
+      // or can end up with delivery being processed on backup before original send
 
-            handleManagementMessage(message);
-         }
-         else
-         {
-            send(message);
-         }
-
-         if (packet.isRequiresResponse())
-         {
-            response = new NullResponseMessage();
-         }
+      if (result == null)
+      {
+         doSendContinuations(packet);
       }
-      catch (Exception e)
+      else
       {
-         log.error("Failed to send message", e);
-
-         if (packet.isRequiresResponse())
+         result.setResultRunner(new Runnable()
          {
-            if (e instanceof MessagingException)
+            public void run()
             {
-               response = new MessagingExceptionMessage((MessagingException)e);
+               doSendContinuations(packet);
             }
-            else
-            {
-               response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
-            }
-         }
+         });
       }
 
-      channel.confirm(packet);
 
-      if (response != null)
-      {
-         channel.send(response);
-      }
    }
 
-   private void handleManagementMessage(final ServerMessage message) throws Exception
-   {
-      doSecurity(message);
-
-      managementService.handleMessage(message);
-      
-      SimpleString replyTo = (SimpleString)message.getProperty(ClientMessageImpl.REPLYTO_HEADER_NAME);
-      if (replyTo != null)
-      {
-         message.setDestination(replyTo);
-         send(message);
-      }
-   }
-
+   
    public void handleReplicatedDelivery(final SessionReplicateDeliveryMessage packet)
    {
       ServerConsumer consumer = consumers.get(packet.getConsumerID());
@@ -2360,25 +2296,25 @@
    public int transferConnection(final RemotingConnection newConnection, final int lastReceivedCommandID)
    {
       boolean wasStarted = this.started;
-      
+
       if (wasStarted)
       {
          this.setStarted(false);
       }
-      
+
       remotingConnection.removeFailureListener(this);
 
       channel.transferConnection(newConnection);
-      
+
       RemotingConnection oldReplicatingConnection = newConnection.getReplicatingConnection();
-      
+
       if (oldReplicatingConnection != null)
       {
          oldReplicatingConnection.destroy();
       }
-      
+
       newConnection.setReplicatingConnection(remotingConnection.getReplicatingConnection());
-      
+
       remotingConnection.setReplicatingConnection(null);
 
       newConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
@@ -2393,12 +2329,12 @@
       int serverLastReceivedCommandID = channel.getLastReceivedCommandID();
 
       channel.replayCommands(lastReceivedCommandID);
-      
+
       if (wasStarted)
       {
          this.setStarted(true);
       }
-      
+
       return serverLastReceivedCommandID;
    }
 
@@ -2415,7 +2351,7 @@
       try
       {
          log.info("Connection timed out, so clearing up resources for session " + name);
-         
+
          for (Runnable runner : failureRunners)
          {
             try
@@ -2452,6 +2388,167 @@
    // Private
    // ----------------------------------------------------------------------------
 
+   private void doSendLargeMessage(final SessionSendMessage packet)
+   {
+      Packet response = null;
+
+      try
+      {
+         largeMessage = createLargeMessageStorage(packet.getMessageID(), packet.getLargeMessageHeader());
+
+         if (packet.isRequiresResponse())
+         {
+            response = new NullResponseMessage();
+         }
+      }
+      catch (Exception e)
+      {
+         log.error("Failed to send message", e);
+
+         if (packet.isRequiresResponse())
+         {
+            if (e instanceof MessagingException)
+            {
+               response = new MessagingExceptionMessage((MessagingException)e);
+            }
+            else
+            {
+               response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+            }
+         }
+      }
+
+      channel.confirm(packet);
+
+      if (response != null)
+      {
+         channel.send(response);
+      }
+   }
+
+   private void doSend(final SessionSendMessage packet)
+   {
+      Packet response = null;
+
+      try
+      {
+         ServerMessage message = packet.getServerMessage();
+
+         if (message.getDestination().equals(managementAddress))
+         {
+            // It's a management message
+
+            handleManagementMessage(message);
+         }
+         else
+         {
+            send(message);
+         }
+
+         if (packet.isRequiresResponse())
+         {
+            response = new NullResponseMessage();
+         }
+      }
+      catch (Exception e)
+      {
+         log.error("Failed to send message", e);
+
+         if (packet.isRequiresResponse())
+         {
+            if (e instanceof MessagingException)
+            {
+               response = new MessagingExceptionMessage((MessagingException)e);
+            }
+            else
+            {
+               response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+            }
+         }
+      }
+
+      channel.confirm(packet);
+
+      if (response != null)
+      {
+         channel.send(response);
+      }
+   }
+   
+   /**
+    * @param packet
+    */
+   private void doSendContinuations(final SessionSendContinuationMessage packet)
+   {
+      Packet response = null;
+
+      try
+      {
+         
+         if (largeMessage == null)
+         {
+            throw new MessagingException(MessagingException.ILLEGAL_STATE, "large-message not initialized on server");
+         }
+         
+         largeMessage.addBytes(packet.getBody());
+
+         if (!packet.isContinues())
+         {
+            final ServerLargeMessage message = largeMessage;
+
+            largeMessage = null;
+
+            message.complete();
+
+            send(message);
+         }
+
+         if (packet.isRequiresResponse())
+         {
+            response = new NullResponseMessage();
+         }
+      }
+      catch (Exception e)
+      {
+         log.error("Failed to send message", e);
+
+         if (packet.isRequiresResponse())
+         {
+            if (e instanceof MessagingException)
+            {
+               response = new MessagingExceptionMessage((MessagingException)e);
+            }
+            else
+            {
+               response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+            }
+         }
+      }
+
+      channel.confirm(packet);
+
+      if (response != null)
+      {
+         channel.send(response);
+      }
+   }
+
+
+   private void handleManagementMessage(final ServerMessage message) throws Exception
+   {
+      doSecurity(message);
+
+      managementService.handleMessage(message);
+      
+      SimpleString replyTo = (SimpleString)message.getProperty(ClientMessageImpl.REPLYTO_HEADER_NAME);
+      if (replyTo != null)
+      {
+         message.setDestination(replyTo);
+         send(message);
+      }
+   }
+
+
    private ServerLargeMessage createLargeMessageStorage(final long messageID, final byte[] header) throws Exception
    {
       ServerLargeMessage largeMessage = storageManager.createLargeMessage();

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2008-12-09 03:53:21 UTC (rev 5482)
@@ -15,7 +15,6 @@
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ACKNOWLEDGE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ADD_DESTINATION;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BINDINGQUERY;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CHUNK_SEND;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CLOSE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_COMMIT;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_CLOSE;
@@ -30,6 +29,7 @@
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_REPLICATE_DELIVERY;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ROLLBACK;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SEND;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SEND_CONTINUATION;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_START;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_STOP;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_COMMIT;
@@ -60,8 +60,8 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionExpiredMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendChunkMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionReplicateDeliveryMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendContinuationMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
@@ -72,8 +72,6 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXARollbackMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAStartMessage;
-import org.jboss.messaging.core.server.ServerLargeMessage;
-import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.ServerSession;
 
 /**
@@ -91,8 +89,7 @@
 
    private final Channel channel;
 
-   public ServerSessionPacketHandler(final ServerSession session,
-                                     final Channel channel)
+   public ServerSessionPacketHandler(final ServerSession session, final Channel channel)
 
    {
       this.session = session;
@@ -128,7 +125,7 @@
             case SESS_DELETE_QUEUE:
             {
                SessionDeleteQueueMessage request = (SessionDeleteQueueMessage)packet;
-               session.handleDeleteQueue(request);               
+               session.handleDeleteQueue(request);
                break;
             }
             case SESS_QUEUEQUERY:
@@ -146,18 +143,18 @@
             case SESS_ACKNOWLEDGE:
             {
                SessionAcknowledgeMessage message = (SessionAcknowledgeMessage)packet;
-               session.handleAcknowledge(message);               
+               session.handleAcknowledge(message);
                break;
             }
             case SESS_EXPIRED:
             {
                SessionExpiredMessage message = (SessionExpiredMessage)packet;
-               session.handleExpired(message);               
+               session.handleExpired(message);
                break;
             }
             case SESS_COMMIT:
             {
-               session.handleCommit(packet);               
+               session.handleCommit(packet);
                break;
             }
             case SESS_ROLLBACK:
@@ -220,7 +217,7 @@
             }
             case SESS_XA_INDOUBT_XIDS:
             {
-               session.handleGetInDoubtXids(packet);               
+               session.handleGetInDoubtXids(packet);
                break;
             }
             case SESS_XA_GET_TIMEOUT:
@@ -243,7 +240,7 @@
             case SESS_REMOVE_DESTINATION:
             {
                SessionRemoveDestinationMessage message = (SessionRemoveDestinationMessage)packet;
-               session.handleRemoveDestination(message);              
+               session.handleRemoveDestination(message);
                break;
             }
             case SESS_START:
@@ -281,13 +278,20 @@
             case SESS_SEND:
             {
                SessionSendMessage message = (SessionSendMessage)packet;
-               session.handleSend(message);
+               if (message.isLargeMessage())
+               {
+                  session.handleSendLargeMessage(message);
+               }
+               else
+               {
+                  session.handleSend(message);
+               }
                break;
             }
-            case SESS_CHUNK_SEND:
+            case SESS_SEND_CONTINUATION:
             {
-               SessionSendChunkMessage message = (SessionSendChunkMessage)packet;
-               session.handleSendChunkMessage(message);
+               SessionSendContinuationMessage message = (SessionSendContinuationMessage)packet;
+               session.handleSendContinuations(message);
                break;
             }
             case SESS_REPLICATE_DELIVERY:

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java	2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java	2008-12-09 03:53:21 UTC (rev 5482)
@@ -85,6 +85,7 @@
 
    protected void testChunks(final boolean realFiles,
                              final boolean useFile,
+                             final boolean preAck,
                              final int numberOfMessages,
                              final int numberOfIntegers,
                              final boolean sendingBlocking,
@@ -93,6 +94,7 @@
    {
       testChunks(realFiles,
                  useFile,
+                 preAck,
                  numberOfMessages,
                  numberOfIntegers,
                  sendingBlocking,
@@ -103,6 +105,7 @@
 
    protected void testChunks(final boolean realFiles,
                              final boolean useFile,
+                             final boolean preAck,
                              final int numberOfMessages,
                              final int numberOfIntegers,
                              final boolean sendingBlocking,
@@ -127,9 +130,11 @@
             sf.setBlockOnAcknowledge(true);
          }
 
-         ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
+         ClientSession session = sf.createSession(null, null, false, true, false, preAck, 0);
 
          session.createQueue(ADDRESS, ADDRESS, null, true, false, true);
+         
+         long initialSize = messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize();
 
          ClientProducer producer = session.createProducer(ADDRESS);
 
@@ -201,7 +206,7 @@
             sf = createInVMFactory();
          }
 
-         session = sf.createSession(null, null, false, true, true, false, 0);
+         session = sf.createSession(null, null, false, true, true, preAck, 0);
 
          ClientConsumer consumer = null;
 
@@ -242,7 +247,10 @@
                           System.currentTimeMillis() - originalTime >= delayDelivery);
             }
 
-            message.acknowledge();
+            if (!preAck)
+            {
+               message.acknowledge();
+            }
 
             assertNotNull(message);
 
@@ -272,7 +280,13 @@
 
          session.close();
 
+         assertEquals(initialSize, messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize());
+         assertEquals(0, messagingService.getServer().getPostOffice().getBinding(ADDRESS).getQueue().getDeliveringCount());
+         assertEquals(0, messagingService.getServer().getPostOffice().getBinding(ADDRESS).getQueue().getMessageCount());
+
          validateNoFilesOnLargeDir();
+         
+
       }
       finally
       {

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java	2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java	2008-12-09 03:53:21 UTC (rev 5482)
@@ -206,22 +206,33 @@
 
    public void testMessageChunkFilePersistence() throws Exception
    {
-      testChunks(true, false, 100, 262144, false, 1000, 0);
+      testChunks(true, false, false, 100, 262144, false, 1000, 0);
    }
 
+   public void testMessageChunkFilePersistenceBlocked() throws Exception
+   {
+      testChunks(true, false, false, 100, 262144, true, 1000, 0);
+   }
+
+
+   public void testMessageChunkFilePersistenceBlockedPreCommit() throws Exception
+   {
+      testChunks(true, false, true, 100, 262144, true, 1000, 0);
+   }
+
    public void testMessageChunkFilePersistenceDelayed() throws Exception
    {
-      testChunks(true, false, 1, 50000, false, 1000, 2000);
+      testChunks(true, false, false, 1, 50000, false, 1000, 2000);
    }
 
    public void testMessageChunkNullPersistence() throws Exception
    {
-      testChunks(false, false, 1, 50000, false, 1000, 0);
+      testChunks(false, false, false, 1, 50000, false, 1000, 0);
    }
 
    public void testMessageChunkNullPersistenceDelayed() throws Exception
    {
-      testChunks(false, false, 100, 50000, false, 10000, 100);
+      testChunks(false, false, false, 100, 50000, false, 10000, 100);
    }
 
    public void testPageOnLargeMessage() throws Exception
@@ -238,44 +249,44 @@
 
    public void testSendfileMessage() throws Exception
    {
-      testChunks(true, true, 100, 50000, false, 1000, 0);
+      testChunks(true, true, false, 100, 50000, false, 1000, 0);
 
    }
 
    public void testSendfileMessageOnNullPersistence() throws Exception
    {
-      testChunks(false, true, 100, 50000, false, 1000, 0);
+      testChunks(false, true, false, 100, 50000, false, 1000, 0);
    }
 
    public void testSendfileMessageOnNullPersistenceSmallMessage() throws Exception
    {
-      testChunks(false, true, 100, 100, false, 1000, 0);
+      testChunks(false, true, false, 100, 100, true, 1000, 0);
    }
 
    public void testSendfileMessageSmallMessage() throws Exception
    {
-      testChunks(true, true, 100, 4, false, 1000, 0);
+      testChunks(true, true, false, 100, 4, false, 1000, 0);
 
    }
 
    public void testSendRegularMessageNullPersistence() throws Exception
    {
-      testChunks(false, false, 100, 100, false, 1000, 0);
+      testChunks(false, false, false, 100, 100, false, 1000, 0);
    }
 
    public void testSendRegularMessageNullPersistenceDelayed() throws Exception
    {
-      testChunks(false, false, 100, 100, false, 1000, 1000);
+      testChunks(false, false, false, 100, 100, false, 1000, 1000);
    }
 
    public void testSendRegularMessagePersistence() throws Exception
    {
-      testChunks(true, false, 100, 100, false, 1000, 0);
+      testChunks(true, false, false, 100, 100, false, 1000, 0);
    }
 
    public void testSendRegularMessagePersistenceDelayed() throws Exception
    {
-      testChunks(true, false, 100, 100, false, 1000, 1000);
+      testChunks(true, false, false, 100, 100, false, 1000, 1000);
    }
 
    public void testTwoBindingsTwoStartedConsumers() throws Exception

Added: trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingServiceIntegrationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingServiceIntegrationTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingServiceIntegrationTest.java	2008-12-09 03:53:21 UTC (rev 5482)
@@ -0,0 +1,199 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.paging;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+
+import junit.framework.AssertionFailedError;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.util.DataConstants;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A PagingServiceIntegrationTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * 
+ * Created Dec 5, 2008 8:25:58 PM
+ *
+ *
+ */
+public class PagingServiceIntegrationTest extends ServiceTestBase
+{
+
+   // Constants -----------------------------------------------------
+   private static final Logger log = Logger.getLogger(PagingServiceIntegrationTest.class);
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      super.tearDown();
+   }
+
+   public void testSendReceivePaging() throws Exception
+   {
+      clearData();
+
+      Configuration config = createDefaultConfig();
+
+      config.setPagingMaxGlobalSizeBytes(100 * 1024);
+      config.setPagingDefaultSize(10 * 1024);
+
+      MessagingService messagingService = createService(true, config, new HashMap<String, QueueSettings>());
+
+      messagingService.start();
+
+      final int numberOfIntegers = 256;
+
+      final int numberOfMessages = 10000;
+
+      try
+      {
+         ClientSessionFactory sf = createInVMFactory();
+
+         sf.setBlockOnNonPersistentSend(true);
+         sf.setBlockOnPersistentSend(true);
+         sf.setBlockOnAcknowledge(true);
+
+         ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
+
+         session.createQueue(ADDRESS, ADDRESS, null, true, false, true);
+
+         ClientProducer producer = session.createProducer(ADDRESS);
+
+         ByteBuffer ioBuffer = ByteBuffer.allocate(DataConstants.SIZE_INT * numberOfIntegers);
+
+         ClientMessage message = null;
+
+         MessagingBuffer body = null;
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            MessagingBuffer bodyLocal = new ByteBufferWrapper(ioBuffer);
+
+            for (int j = 1; j <= numberOfIntegers; j++)
+            {
+               bodyLocal.putInt(j);
+            }
+            bodyLocal.flip();
+
+            if (i == 0)
+            {
+               body = bodyLocal;
+            }
+
+            message = session.createClientMessage(true);
+            message.setBody(bodyLocal);
+            message.putIntProperty(new SimpleString("id"), i);
+
+            producer.send(message);
+         }
+
+         session.close();
+
+         messagingService.stop();
+
+         messagingService = createService(true, config, new HashMap<String, QueueSettings>());
+         messagingService.start();
+
+         sf = createInVMFactory();
+
+         session = sf.createSession(null, null, false, true, true, false, 0);
+
+         ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+         session.start();
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            ClientMessage message2 = consumer.receive(10000);
+
+            assertNotNull(message2);
+
+            assertEquals(i, ((Integer)message2.getProperty(new SimpleString("id"))).intValue());
+
+            message2.acknowledge();
+
+            assertNotNull(message2);
+
+            try
+            {
+               assertEqualsByteArrays(body.limit(), body.array(), message2.getBody().array());
+            }
+            catch (AssertionFailedError e)
+            {
+               log.info("Expected buffer:" + dumbBytesHex(body.array(), 40));
+               log.info("Arriving buffer:" + dumbBytesHex(message2.getBody().array(), 40));
+               throw e;
+            }
+         }
+
+         consumer.close();
+
+         session.close();
+      }
+      finally
+      {
+         try
+         {
+            messagingService.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: trunk/tests/src/org/jboss/messaging/tests/soak/chunk/MessageChunkSoakTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/soak/chunk/MessageChunkSoakTest.java	2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/tests/src/org/jboss/messaging/tests/soak/chunk/MessageChunkSoakTest.java	2008-12-09 03:53:21 UTC (rev 5482)
@@ -48,7 +48,7 @@
 
    public void testMessageChunkFilePersistence1G() throws Exception
    {
-      testChunks(true, true, 2, 268435456, false, 300000, 0, true);
+      testChunks(true, true, false, 2, 268435456, false, 300000, 0, true);
    }
 
    // Package protected ---------------------------------------------

Modified: trunk/tests/src/org/jboss/messaging/tests/stress/chunk/MessageChunkStressTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/stress/chunk/MessageChunkStressTest.java	2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/tests/src/org/jboss/messaging/tests/stress/chunk/MessageChunkStressTest.java	2008-12-09 03:53:21 UTC (rev 5482)
@@ -48,12 +48,12 @@
 
    public void testMessageChunkFilePersistence100M() throws Exception
    {
-      testChunks(true, true, 10, 26214400, false, 120000, 0);
+      testChunks(true, true, false, 10, 26214400, false, 120000, 0);
    }
 
    public void testMessageChunkFilePersistence1M() throws Exception
    {
-      testChunks(true, true, 1000, 262144, false, 120000, 0);
+      testChunks(true, true, false, 1000, 262144, false, 120000, 0);
    }
 
    // Package protected ---------------------------------------------

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java	2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java	2008-12-09 03:53:21 UTC (rev 5482)
@@ -322,6 +322,8 @@
                                                                                                              .array());
 
       assertEquals(0, buffers2.size());
+      
+      assertEquals(0, storeImpl.getAddressSize());
 
    }
 

Modified: trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java	2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java	2008-12-09 03:53:21 UTC (rev 5482)
@@ -78,16 +78,23 @@
 
    // Protected -----------------------------------------------------
 
+
    protected void clearData()
    {
-      deleteAndCreateDir(getJournalDir());
-      deleteAndCreateDir(getBindingsDir());
-      deleteAndCreateDir(getPageDir());
-      deleteAndCreateDir(getLargeMessagesDir());
-      deleteAndCreateDir(getClientLargeMessagesDir());
-      deleteAndCreateDir(getTemporaryDir());
+      clearData(getTestDir());
    }
 
+   
+   protected void clearData(String testDir)
+   {
+      deleteAndCreateDir(getJournalDir(testDir));
+      deleteAndCreateDir(getBindingsDir(testDir));
+      deleteAndCreateDir(getPageDir(testDir));
+      deleteAndCreateDir(getLargeMessagesDir(testDir));
+      deleteAndCreateDir(getClientLargeMessagesDir(testDir));
+      deleteAndCreateDir(getTemporaryDir(testDir));
+   }
+
    protected void deleteAndCreateDir(String directory)
    {
       File file = new File(directory);
@@ -155,6 +162,7 @@
       Configuration configuration = new ConfigurationImpl();
       configuration.setSecurityEnabled(false);
       configuration.setJournalMinFiles(2);
+      configuration.setJournalDirectory(getJournalDir());
       configuration.setJournalFileSize(100 * 1024);
       configuration.setPagingDirectory(getPageDir());
       configuration.setLargeMessagesDirectory(getLargeMessagesDir());

Modified: trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java	2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java	2008-12-09 03:53:21 UTC (rev 5482)
@@ -69,22 +69,11 @@
    public static final String NETTY_ACCEPTOR_FACTORY = "org.jboss.messaging.integration.transports.netty.NettyAcceptorFactory";
 
    public static final String NETTY_CONNECTOR_FACTORY = "org.jboss.messaging.integration.transports.netty.NettyConnectorFactory";
+
    // Attributes ----------------------------------------------------
 
    private String testDir = System.getProperty("java.io.tmpdir", "/tmp") + "/jbm-unit-test";
 
-   private String journalDir = testDir + "/journal";
-
-   private String bindingsDir = testDir + "/bindings";
-
-   private String pageDir = testDir + "/page";
-
-   private String largeMessagesDir = testDir + "/large-msg";
-
-   private String clientLargeMessagesDir = testDir + "/client-large-msg";
-
-   private String temporaryDir = testDir + "/temporary";
-
    // Static --------------------------------------------------------
 
    public static String dumpBytes(byte[] bytes)
@@ -198,7 +187,7 @@
    /**
     * @return the testDir
     */
-   public String getTestDir()
+   protected String getTestDir()
    {
       return testDir;
    }
@@ -206,51 +195,96 @@
    /**
     * @return the journalDir
     */
-   public String getJournalDir()
+   protected String getJournalDir()
    {
-      return journalDir;
+      return getJournalDir(testDir);
    }
 
+   protected String getJournalDir(String testDir)
+   {
+      return testDir + "/journal";
+   }
+
    /**
     * @return the bindingsDir
     */
-   public String getBindingsDir()
+   protected String getBindingsDir()
    {
-      return bindingsDir;
+      return getBindingsDir(testDir);
    }
 
    /**
+    * @return the bindingsDir
+    */
+   protected String getBindingsDir(String testDir)
+   {
+      return testDir + "/bindings";
+   }
+
+   /**
     * @return the pageDir
     */
-   public String getPageDir()
+   protected String getPageDir()
    {
-      return pageDir;
+      return getPageDir(testDir);
    }
 
    /**
+    * @return the pageDir
+    */
+   protected String getPageDir(String testDir)
+   {
+      return testDir + "/page";
+   }
+
+   /**
     * @return the largeMessagesDir
     */
-   public String getLargeMessagesDir()
+   protected String getLargeMessagesDir()
    {
-      return largeMessagesDir;
+      return getLargeMessagesDir(testDir);
    }
 
    /**
+    * @return the largeMessagesDir
+    */
+   protected String getLargeMessagesDir(String testDir)
+   {
+      return testDir + "/large-msg";
+   }
+
+   /**
     * @return the clientLargeMessagesDir
     */
-   public String getClientLargeMessagesDir()
+   protected String getClientLargeMessagesDir()
    {
-      return clientLargeMessagesDir;
+      return getClientLargeMessagesDir(testDir);
    }
 
    /**
+    * @return the clientLargeMessagesDir
+    */
+   protected String getClientLargeMessagesDir(String testDir)
+   {
+      return testDir + "/client-large-msg";
+   }
+
+   /**
     * @return the temporaryDir
     */
-   public String getTemporaryDir()
+   protected String getTemporaryDir()
    {
-      return temporaryDir;
+      return getTemporaryDir(testDir);
    }
 
+   /**
+    * @return the temporaryDir
+    */
+   protected String getTemporaryDir(String testDir)
+   {
+      return testDir + "/temp";
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------




More information about the jboss-cvs-commits mailing list