[jboss-cvs] JBoss Messaging SVN: r4715 - in branches: Branch_Message_Chunking and 19 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Jul 23 07:27:03 EDT 2008


Author: ataylor
Date: 2008-07-23 07:27:02 -0400 (Wed, 23 Jul 2008)
New Revision: 4715

Added:
   branches/Branch_Message_Chunking/
   branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/message/MessageBodyEncoder.java
   branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/DecoderHandler.java
   branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/EncoderHandler.java
   branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/MessageCache.java
   branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/PacketAssembler.java
   branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/PacketFragment.java
   branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/InMemoryMessageCache.java
   branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/MessagingBufferCache.java
   branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/PacketAssemblerImpl.java
   branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/PacketFragmentBufferImpl.java
   branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/PacketFragmentImpl.java
   branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaDecoderHandler.java
   branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaEncoderHandler.java
   branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketFragmentBuffer.java
   branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/PacketFragmentBufferImplTest.java
Modified:
   branches/Branch_Message_Chunking/build-messaging.xml
   branches/Branch_Message_Chunking/examples/jms/src/org/jboss/jms/example/QueueExample.java
   branches/Branch_Message_Chunking/messaging.ipr
   branches/Branch_Message_Chunking/native/src/org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl.h
   branches/Branch_Message_Chunking/src/config/jbm-configuration.xml
   branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/client/ConnectionParams.java
   branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/client/impl/ConnectionParamsImpl.java
   branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/config/Configuration.java
   branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
   branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
   branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/message/Message.java
   branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
   branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/MessagingCodec.java
   branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/Packet.java
   branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/MessagingCodecImpl.java
   branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java
   branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupportImpl.java
   branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java
   branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaAcceptor.java
   branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
   branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaProtocolCodecFilter.java
   branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
   branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerSendMessage.java
   branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReceiveMessage.java
   branches/Branch_Message_Chunking/src/main/org/jboss/messaging/jms/client/JBossBytesMessage.java
   branches/Branch_Message_Chunking/src/main/org/jboss/messaging/jms/client/JBossMapMessage.java
   branches/Branch_Message_Chunking/src/main/org/jboss/messaging/jms/client/JBossMessage.java
   branches/Branch_Message_Chunking/src/main/org/jboss/messaging/jms/client/JBossMessageProducer.java
   branches/Branch_Message_Chunking/src/main/org/jboss/messaging/jms/client/JBossObjectMessage.java
   branches/Branch_Message_Chunking/src/main/org/jboss/messaging/jms/client/JBossStreamMessage.java
   branches/Branch_Message_Chunking/src/main/org/jboss/messaging/jms/client/JBossTextMessage.java
   branches/Branch_Message_Chunking/src/main/org/jboss/messaging/util/ByteBufferWrapper.java
   branches/Branch_Message_Chunking/src/main/org/jboss/messaging/util/ExpandingMessagingBuffer.java
   branches/Branch_Message_Chunking/src/main/org/jboss/messaging/util/MessagingBuffer.java
   branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/FilterChainSupportTest.java
   branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaSessionTest.java
   branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/SessionTestBase.java
   branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/mina/MinaConnectorTest.java
   branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/mina/MinaProtocolCodecFilterTest.java
   branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossObjectMessageTest.java
   branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossTextMessageTest.java
Log:
message chunking branch

Copied: branches/Branch_Message_Chunking (from rev 4713, trunk)

Modified: branches/Branch_Message_Chunking/build-messaging.xml
===================================================================
--- trunk/build-messaging.xml	2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/build-messaging.xml	2008-07-23 11:27:02 UTC (rev 4715)
@@ -894,6 +894,7 @@
 
    <target name="runServer" depends="jar">
       <java classname="org.jboss.messaging.microcontainer.JBMBootstrapServer" fork="true">
+         <jvmarg line="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005"/>
          <jvmarg value="-XX:+UseParallelGC"/>
          <jvmarg value="-Xms512M"/>
          <jvmarg value="-Xmx2048M"/>

Modified: branches/Branch_Message_Chunking/examples/jms/src/org/jboss/jms/example/QueueExample.java
===================================================================
--- trunk/examples/jms/src/org/jboss/jms/example/QueueExample.java	2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/examples/jms/src/org/jboss/jms/example/QueueExample.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -21,15 +21,7 @@
    */
 package org.jboss.jms.example;
 
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
+import javax.jms.*;
 import javax.naming.InitialContext;
 
 import org.jboss.messaging.core.logging.Logger;
@@ -55,17 +47,79 @@
          
          connection = cf.createConnection();
          Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         MessageProducer producer = session.createProducer(queue);
-         Message message = session.createTextMessage("This is a text message!");
-         
+         final MessageProducer producer = session.createProducer(queue);
+         final BytesMessage message = session.createBytesMessage();
+         byte[] bytes = new byte[1024];
+         for(int i = 0; i < 1024; i++)
+         {
+            bytes[i] = (byte) i;
+         }
+         byte[] newbytes = new byte[bytes.length];
+         message.writeBytes(bytes);
          log.info("sending message to queue");
-         producer.send(message);
-         
+         new Thread(new Runnable()
+         {
+            public void run()
+            {
+
+               try
+               {
+                  producer.send(message);
+               }
+               catch (JMSException e)
+               {
+                  e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+               }
+            }
+         }).start();
+          new Thread(new Runnable()
+         {
+            public void run()
+            {
+
+               try
+               {
+                  producer.send(message);
+               }
+               catch (JMSException e)
+               {
+                  e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+               }
+            }
+         }).start();
+
          MessageConsumer messageConsumer = session.createConsumer(queue);
          connection.start();
-         TextMessage message2 = (TextMessage) messageConsumer.receive(5000);
+         BytesMessage message2 = (BytesMessage) messageConsumer.receive(50000000);
          log.info("message received from queue");
-         log.info("message = " + message2.getText());
+         message2.readBytes(newbytes);
+         //log.info("message = " + new String(newbytes));
+         for (int i = 0; i < newbytes.length; i++)
+         {
+            byte newbyte = newbytes[i];
+            System.out.print(newbyte);
+            System.out.print(" = ");
+            System.out.println(bytes[i]);
+            if(bytes[i] != newbyte)
+            {
+               throw new RuntimeException(i+"");
+            }
+         }
+         message2 = (BytesMessage) messageConsumer.receive(50000000);
+         log.info("message received from queue");
+         message2.readBytes(newbytes);
+         //log.info("message = " + new String(newbytes));
+         for (int i = 0; i < newbytes.length; i++)
+         {
+            byte newbyte = newbytes[i];
+            System.out.print(newbyte);
+            System.out.print(" = ");
+            System.out.println(bytes[i]);
+            if(bytes[i] != newbyte)
+            {
+               throw new RuntimeException(i+"");
+            }
+         }
       }
       catch (Exception e)
       {

Modified: branches/Branch_Message_Chunking/messaging.ipr
===================================================================
--- trunk/messaging.ipr	2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/messaging.ipr	2008-07-23 11:27:02 UTC (rev 4715)
@@ -460,6 +460,7 @@
         <root url="jar://$PROJECT_DIR$/tools/lib/jbossbuild.jar!/" />
         <root url="jar:///home/andy/devtools/apache-ant-1.7.0/lib/ant-junit.jar!/" />
         <root url="jar:///home/andy/devtools/apache-ant-1.7.0/lib/ant.jar!/" />
+        <root url="file://$PROJECT_DIR$/examples/jms/config" />
       </CLASSES>
       <JAVADOC />
       <SOURCES>

Modified: branches/Branch_Message_Chunking/native/src/org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl.h
===================================================================
--- trunk/native/src/org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl.h	2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/native/src/org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl.h	2008-07-23 11:27:02 UTC (rev 4715)
@@ -7,6 +7,7 @@
 #ifdef __cplusplus
 extern "C" {
 #endif
+/* Inaccessible static: _00024VRc */
 /* Inaccessible static: log */
 /* Inaccessible static: totalMaxIO */
 /* Inaccessible static: loaded */

Modified: branches/Branch_Message_Chunking/src/config/jbm-configuration.xml
===================================================================
--- trunk/src/config/jbm-configuration.xml	2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/config/jbm-configuration.xml	2008-07-23 11:27:02 UTC (rev 4715)
@@ -28,7 +28,7 @@
       <remoting-port>5400</remoting-port>
 
       <!--  call timeout in milliseconds -->
-      <remoting-call-timeout>5000</remoting-call-timeout>
+      <remoting-call-timeout>50000</remoting-call-timeout>
 
       <!-- true to enable invm communication when the client and the server are in the same JVM.     -->
       <remoting-enable-invm-optimisation>true</remoting-enable-invm-optimisation>
@@ -54,7 +54,14 @@
       <!--How long to wait for a returning pong after sending a ping message to a client/server.-->
       <!-- If no pong is received after this time resources are cleaned up-->
       <remoting-ping-timeout>5000</remoting-ping-timeout>
-     
+
+      <!--The initial size of the buffer created for the transport-->
+      <remoting-initial-packet-fragment-size>2048</remoting-initial-packet-fragment-size>
+
+      <!--The maximum size of packet to send on the transport, except for the first packet which is the r
+      emoting-initial-packet-fragment-size parameter. setting to 0 means that packet chunking is disabled-->
+      <remoting-packet-fragment-size>1024</remoting-packet-fragment-size>
+
       <!--  if ssl is enabled, all remoting-ssl-* properties must be set -->
       <remoting-enable-ssl>false</remoting-enable-ssl>
 

Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/client/ConnectionParams.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ConnectionParams.java	2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/client/ConnectionParams.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -80,4 +80,12 @@
    String getTrustStorePassword();
 
    void setTrustStorePassword(String trustStorePassword);
+
+   int getPacketFragmentSize();
+
+   int getInitialPacketFragmentSize();
+
+   void setInitialPacketFragmentSize(int initialPacketFragmentSize);
+
+   void setPacketFragmentSize(int packetFragmentSize);
 }

Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/client/impl/ConnectionParamsImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionParamsImpl.java	2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/client/impl/ConnectionParamsImpl.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -32,7 +32,7 @@
 {
    //Constants ---------------------------------------------------------------------------------------
       
-   public static final int DEFAULT_PING_INTERVAL = 10000; // in ms
+   public static final int DEFAULT_PING_INTERVAL = 0; // in ms
    
    public static final int DEFAULT_PING_TIMEOUT = 5000; // ms
    
@@ -47,6 +47,10 @@
    public static final int DEFAULT_TCP_SEND_BUFFER_SIZE = 32 * 1024; // in bytes
    
    public static final boolean DEFAULT_SSL_ENABLED = false;
+
+   public static final Integer DEFAULT_INITIAL_PACKET_FRAGMENT_SIZE = 2048;
+
+   public static final Integer DEFAULT_PACKET_FRAGMENT_SIZE = 1024;
    
    public static final String SSL_KEYSTORE_PATH_PROPERTY_NAME = "jbm.remoting.ssl.keystore.path";
    
@@ -87,7 +91,12 @@
    private String trustStorePath;
    
    private String trustStorePassword;
-   
+
+   private int initialPacketFragmentSize = DEFAULT_INITIAL_PACKET_FRAGMENT_SIZE;
+
+   private int packetFragmentSize = DEFAULT_PACKET_FRAGMENT_SIZE;
+
+
    public long getCallTimeout()
    {
       return callTimeout;
@@ -217,7 +226,27 @@
    {
       this.trustStorePassword = trustStorePassword;
    }
-        
+
+   public void setInitialPacketFragmentSize(int initialPacketFragmentSize)
+   {
+      this.initialPacketFragmentSize = initialPacketFragmentSize;
+   }
+
+   public int getInitialPacketFragmentSize()
+   {
+      return initialPacketFragmentSize;
+   }
+
+   public void setPacketFragmentSize(int packetFragmentSize)
+   {
+      this.packetFragmentSize = packetFragmentSize;
+   }
+
+   public int getPacketFragmentSize()
+   {
+      return packetFragmentSize;
+   }
+
    public boolean equals(Object other)
    {
       if (other instanceof ConnectionParams == false)

Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/Configuration.java	2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/config/Configuration.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -158,4 +158,8 @@
    boolean isCreateJournalDir();
    
    void setCreateJournalDir(boolean create);
+
+   int getInitialPacketFragmentSize();
+
+   int getPacketFragmentSize();
 }

Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -94,9 +94,11 @@
    public static final long DEFAULT_AIO_TIMEOUT = 60000; // in ms
    
    public static final long DEFAULT_JOURNAL_TASK_PERIOD = 5000;
+
+   public static final int INITIAL_PACKET_FRAGMENT_SIZE = 2048;
    
+   public static final int PACKET_FRAGMENT_SIZE = 1024;
    
-   
    private static final long serialVersionUID = 4077088945050267843L;
 
    
@@ -153,6 +155,10 @@
    protected String host = DEFAULT_HOST;
    
    protected int port = DEFAULT_PORT;
+
+   protected int initialPacketFragmentSize = INITIAL_PACKET_FRAGMENT_SIZE;
+
+   protected int packetFragmentSize = PACKET_FRAGMENT_SIZE;
    
    protected final ConnectionParams defaultConnectionParams = new ConnectionParamsImpl();
    
@@ -451,7 +457,17 @@
       this.createJournalDir = create;
    }
 
-	public boolean isSecurityEnabled()
+   public int getInitialPacketFragmentSize()
+   {
+      return initialPacketFragmentSize;
+   }
+
+   public int getPacketFragmentSize()
+   {
+      return packetFragmentSize;
+   }
+
+   public boolean isSecurityEnabled()
 	{
 	   return securityEnabled;
 	}

Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java	2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -114,6 +114,10 @@
 
       trustStorePassword = getString(e, "remoting-ssl-truststore-password", null);
 
+      initialPacketFragmentSize = getInteger(e, "remoting-initial-packet-fragment-size", ConnectionParamsImpl.DEFAULT_INITIAL_PACKET_FRAGMENT_SIZE);
+
+      packetFragmentSize = getInteger(e, "remoting-packet-fragment-size", ConnectionParamsImpl.DEFAULT_PACKET_FRAGMENT_SIZE);
+
       defaultConnectionParams.setCallTimeout(callTimeout);
       
       defaultConnectionParams.setInVMOptimisationEnabled(inVMOptimisationEnabled);
@@ -129,7 +133,11 @@
       defaultConnectionParams.setPingTimeout(pingTimeout);
       
       defaultConnectionParams.setSSLEnabled(sslEnabled);
-      
+
+      defaultConnectionParams.setInitialPacketFragmentSize(initialPacketFragmentSize);
+
+      defaultConnectionParams.setPacketFragmentSize(packetFragmentSize);
+
       // Persistence config
 
       bindingsDirectory = getString(e, "bindings-directory", bindingsDirectory);

Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/message/Message.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/Message.java	2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/message/Message.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -25,8 +25,8 @@
 import java.util.Set;
 
 import org.jboss.messaging.core.journal.EncodingSupport;
+import org.jboss.messaging.util.SimpleString;
 import org.jboss.messaging.util.MessagingBuffer;
-import org.jboss.messaging.util.SimpleString;
 
 /**
  * A message is a routable instance that has a payload.
@@ -68,6 +68,8 @@
    
    int getEncodeSize();
 
+   void setMessageBodyEncoder(MessageBodyEncoder bodyEncoder);
+
    void encode(MessagingBuffer buffer);
    
    void decode(MessagingBuffer buffer);

Added: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/message/MessageBodyEncoder.java
===================================================================
--- branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/message/MessageBodyEncoder.java	                        (rev 0)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/message/MessageBodyEncoder.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -0,0 +1,13 @@
+package org.jboss.messaging.core.message;
+
+import org.jboss.messaging.util.MessagingBuffer;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public interface MessageBodyEncoder
+{
+   void encode(MessagingBuffer buffer);
+
+   void decode(MessagingBuffer buffer, int len);
+}

Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java	2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -28,12 +28,12 @@
 import static org.jboss.messaging.util.DataConstants.SIZE_LONG;
 
 import java.util.Set;
+import java.nio.ByteBuffer;
 
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.message.Message;
-import org.jboss.messaging.util.MessagingBuffer;
-import org.jboss.messaging.util.SimpleString;
-import org.jboss.messaging.util.TypedProperties;
+import org.jboss.messaging.core.message.MessageBodyEncoder;
+import org.jboss.messaging.util.*;
 
 /**
  * A concrete implementation of a message
@@ -77,6 +77,8 @@
 
    private MessagingBuffer body;
 
+   private MessageBodyEncoder bodyEncoder = new SimpleBodyEncoder();
+
    // Constructors --------------------------------------------------
 
    protected MessageImpl()
@@ -131,8 +133,10 @@
       buff.putLong(timestamp);
       buff.putByte(priority);
       properties.encode(buff);
-      buff.putInt(body.limit());
-      buff.putBytes(body.array(), 0, body.limit());
+      int pos = buff.position();
+      buff.putInt(-1);
+      bodyEncoder.encode(buff);
+      buff.putInt(pos, buff.position() - pos - DataConstants.SIZE_INT);
    }
 
    public int getEncodeSize()
@@ -158,14 +162,16 @@
 
       properties.decode(buffer);
       int len = buffer.getInt();
+      bodyEncoder.decode(buffer, len);
 
-      //TODO - this can be optimised
-      byte[] bytes = new byte[len];
-      buffer.getBytes(bytes);
-      body = buffer.createNewBuffer(len);
-      body.putBytes(bytes);      
+
    }
-   
+
+   public void setMessageBodyEncoder(MessageBodyEncoder bodyEncoder)
+   {
+      this.bodyEncoder = bodyEncoder;
+   }
+
    public SimpleString getDestination()
    {
       return destination;
@@ -311,14 +317,31 @@
    {
       this.body = body;
    }
+
       
    // Public --------------------------------------------------------
    
    // Package protected ---------------------------------------------
-
    // Protected -----------------------------------------------------
 
    // Private -------------------------------------------------------
 
-   // Inner classes -------------------------------------------------  
+   // Inner classes -------------------------------------------------
+
+   class SimpleBodyEncoder implements MessageBodyEncoder
+   {
+      public void encode(MessagingBuffer buffer)
+      {
+         buffer.putBytes(body.array(), 0, body.limit());
+      }
+
+      public void decode(MessagingBuffer buffer, int len)
+      {
+         body =  new ByteBufferWrapper(ByteBuffer.allocate(len));
+         byte[] bytes = new byte[len];
+         buffer.getBytes(bytes);
+         body.putBytes(bytes);
+         body.flip();
+      }
+   }
 }

Added: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/DecoderHandler.java
===================================================================
--- branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/DecoderHandler.java	                        (rev 0)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/DecoderHandler.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -0,0 +1,11 @@
+package org.jboss.messaging.core.remoting;
+
+/**
+ * called by the PacketAssembler after a packet has been decoded
+ * 
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public interface DecoderHandler
+{
+   void handle(Packet packet);   
+}

Added: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/EncoderHandler.java
===================================================================
--- branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/EncoderHandler.java	                        (rev 0)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/EncoderHandler.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -0,0 +1,13 @@
+package org.jboss.messaging.core.remoting;
+
+import org.jboss.messaging.util.MessagingBuffer;
+
+/**
+ * called by the PacketAssembler once a message has been encoded.
+ * 
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public interface EncoderHandler
+{
+   void handle(MessagingBuffer buffer);
+}

Added: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/MessageCache.java
===================================================================
--- branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/MessageCache.java	                        (rev 0)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/MessageCache.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -0,0 +1,15 @@
+package org.jboss.messaging.core.remoting;
+
+import org.jboss.messaging.util.MessagingBuffer;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public interface MessageCache
+{
+   void cache(MessagingBuffer buffer, int length, long sessionId, int packetId);
+
+   MessagingBuffer retrieve(int length, long sessionId, int packetId, int correlationId);
+
+   void clear(long id);
+}

Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/MessagingCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/MessagingCodec.java	2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/MessagingCodec.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -24,6 +24,9 @@
 
 import org.jboss.messaging.util.MessagingBuffer;
 
+import java.util.Set;
+import java.util.List;
+
 /**
  * Used to encode/decode messages.
  * @author <a href="ataylor at redhat.com">Andy Taylor</a>

Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/Packet.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/Packet.java	2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/Packet.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -24,6 +24,9 @@
 
 import org.jboss.messaging.util.MessagingBuffer;
 
+import java.util.Set;
+import java.util.List;
+
 /**
  * 
  * A Packet

Added: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/PacketAssembler.java
===================================================================
--- branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/PacketAssembler.java	                        (rev 0)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/PacketAssembler.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -0,0 +1,14 @@
+package org.jboss.messaging.core.remoting;
+
+import org.jboss.messaging.util.MessagingBuffer;
+import org.apache.mina.common.IoBuffer;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public interface PacketAssembler
+{
+   boolean assemble(MessagingBuffer buffer, DecoderHandler decoderHandler, long id) throws Exception;
+
+   void disAssemble(MessagingBuffer buffer, EncoderHandler outputHandler, Object message) throws Exception;
+}

Added: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/PacketFragment.java
===================================================================
--- branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/PacketFragment.java	                        (rev 0)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/PacketFragment.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -0,0 +1,37 @@
+package org.jboss.messaging.core.remoting;
+
+import org.jboss.messaging.util.MessagingBuffer;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public interface PacketFragment
+{
+   int getPacketId();
+
+   int getCorrelationId();
+
+   MessagingBuffer getMessagingBuffer();
+
+   int getLength();
+
+   boolean isLastPacket();
+
+   int getDataStartPosition();
+
+   void setDataStartPosition(int dataStartPosition);
+
+   int getEndPosition();
+
+   void reset();
+
+   void setPacketId(int packetId);
+
+   void setCorrelationId(int correlationId);
+
+   void setLastPacket(boolean lastPacket);
+
+   int getHeaderSize();
+
+   void setLength(int i);
+}

Added: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/InMemoryMessageCache.java
===================================================================
--- branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/InMemoryMessageCache.java	                        (rev 0)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/InMemoryMessageCache.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -0,0 +1,84 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting.impl;
+
+import org.jboss.messaging.core.remoting.MessageCache;
+import org.jboss.messaging.util.MessagingBuffer;
+import org.jboss.messaging.util.ByteBufferWrapper;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.List;                                                                                                     
+import java.util.HashMap;
+import java.util.ArrayList;
+
+/**
+ * An in memory cache for message buffer contents.
+ * 
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class InMemoryMessageCache implements MessageCache
+{
+   Map<Long, Map<Integer, List<MessagingBuffer>>> cache = new HashMap<Long, Map<Integer, List<MessagingBuffer>>>();
+   public void cache(MessagingBuffer buffer, int length, long sessionId, int packetId)
+   {
+      byte[] bytes = new byte[length];
+      buffer.getBytes(bytes);
+      getByteList(sessionId, packetId).add(new ByteBufferWrapper(ByteBuffer.wrap(bytes)));
+   }
+
+   public MessagingBuffer retrieve(int length, long sessionId, int packetId, int correlationId)
+   {
+      List<MessagingBuffer> buffers = getByteList(sessionId, packetId);
+      MessagingBuffer buffer = buffers.remove(0);
+      if(buffers.isEmpty())
+      {
+         Map<Integer, List<MessagingBuffer>> sessionMap = cache.get(sessionId);
+         sessionMap.remove(packetId);
+         if(sessionMap.isEmpty())
+         {
+            cache.remove(sessionId);
+         }
+      }
+      return buffer;
+   }
+
+   public void clear(long id)
+   {
+      cache.remove(id);
+   }
+
+
+   protected List<MessagingBuffer> getByteList(long sessionId, int packetId)
+   {
+      if(cache.get(sessionId) == null)
+      {
+         cache.put(sessionId, new HashMap<Integer, List<MessagingBuffer>>());
+      }
+      if(cache.get(sessionId).get(packetId) == null)
+      {
+         cache.get(sessionId).put(packetId, new ArrayList<MessagingBuffer>());
+      }
+      return cache.get(sessionId).get(packetId);
+   }
+
+}

Added: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/MessagingBufferCache.java
===================================================================
--- branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/MessagingBufferCache.java	                        (rev 0)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/MessagingBufferCache.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -0,0 +1,37 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting.impl;
+
+import org.jboss.messaging.util.MessagingBuffer;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class MessagingBufferCache extends InMemoryMessageCache
+{
+   public void cache(MessagingBuffer buffer, int length, long sessionId, int packetId)
+   {
+      MessagingBuffer buff = buffer.slice();
+      buffer.position(buffer.position() + length);
+      getByteList(sessionId, packetId).add(buff);
+   }
+}

Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/MessagingCodecImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/MessagingCodecImpl.java	2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/MessagingCodecImpl.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -25,11 +25,15 @@
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.MessagingCodec;
 import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.PacketFragment;
 import org.jboss.messaging.core.remoting.impl.wireformat.*;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.*;
 import static org.jboss.messaging.util.DataConstants.SIZE_INT;
 import org.jboss.messaging.util.MessagingBuffer;
 
+import java.util.Set;
+import java.util.List;
+
 /**
  * A MessagingCodec
  *
@@ -51,22 +55,7 @@
 
    public Packet decode(final MessagingBuffer in) throws Exception
    {
-      int start = in.position();
 
-      if (in.remaining() <= SIZE_INT)
-      {
-         return null;
-      }
-
-      int length = in.getInt();
-
-      if (in.remaining() < length)
-      {
-         in.position(start);
-
-         return null;
-      }
-
       byte packetType = in.getByte();
 
       Packet packet;

Added: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/PacketAssemblerImpl.java
===================================================================
--- branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/PacketAssemblerImpl.java	                        (rev 0)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/PacketAssemblerImpl.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -0,0 +1,169 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting.impl;
+
+import org.jboss.messaging.core.remoting.*;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketFragmentBuffer;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.util.MessagingBuffer;
+import org.jboss.messaging.util.DataConstants;
+import static org.jboss.messaging.util.DataConstants.SIZE_INT;
+
+import java.util.Map;
+import java.util.List;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class PacketAssemblerImpl implements PacketAssembler
+{
+   private static final Logger log = Logger.getLogger(PacketAssemblerImpl.class);
+   Map<Long, Map<Integer, List<PacketFragment>>> allFragments = new HashMap<Long, Map<Integer, List<PacketFragment>>>();
+   private AtomicInteger packetId = new AtomicInteger(0);
+   private MessagingCodec messagingCodec = new MessagingCodecImpl();
+   private MessageCache messageCache = new InMemoryMessageCache();
+   private int packetFragmentSize;
+   private boolean useMessageChunking = false;
+
+
+   public PacketAssemblerImpl(int packetFragmentSize, MessageCache messageCache)
+   {
+      this.packetFragmentSize = packetFragmentSize;
+      useMessageChunking = this.packetFragmentSize > 0;
+      this.messageCache = messageCache;
+   }
+
+   public boolean assemble(MessagingBuffer buffer, DecoderHandler decoderHandler, long id) throws Exception
+   {
+      int start = buffer.position();
+
+      if (buffer.remaining() <= SIZE_INT)
+      {
+         return false;
+      }
+
+      int length = buffer.getInt();
+
+      if (buffer.remaining() < length)
+      {
+         buffer.position(start);
+
+         return false;
+      }
+      if (useMessageChunking)
+      {
+         assembleMulti(buffer, decoderHandler, id, length);
+      }
+      else
+      {
+         assembleSingle(buffer, decoderHandler, id, length);
+      }
+
+      return true;
+   }
+
+
+   public void disAssemble(MessagingBuffer buffer, EncoderHandler outputHandler, Object message) throws Exception
+   {
+      if (useMessageChunking)
+      {
+         disasembleMulti(buffer, outputHandler, message);
+      }
+      else
+      {
+         disasembleSingle(buffer, outputHandler, message);
+      }
+   }
+
+   private List<PacketFragment> getFragments(long id, int packetId)
+   {
+      if(allFragments.get(id) == null)
+      {
+         allFragments.put(id, new HashMap<Integer, List<PacketFragment>>());
+      }
+      if(allFragments.get(id).get(packetId) == null)
+      {
+         allFragments.get(id).put(packetId, new ArrayList<PacketFragment>());
+      }
+      return allFragments.get(id).get(packetId);
+   }
+
+
+   private void disasembleMulti(MessagingBuffer buffer, EncoderHandler outputHandler, Object message)
+         throws Exception
+   {
+      PacketFragmentBuffer buff = new PacketFragmentBufferImpl(packetId.getAndIncrement(), buffer, packetFragmentSize);
+      messagingCodec.encode(buff, message);
+      buff.prepare();
+      for (PacketFragment packetFragment : buff.getPacketFragments())
+      {
+         outputHandler.handle(packetFragment.getMessagingBuffer());
+      }
+   }
+
+   private void disasembleSingle(MessagingBuffer buffer, EncoderHandler outputHandler, Object message)
+         throws Exception
+   {
+      buffer.putInt(-1);
+      messagingCodec.encode(buffer, message);
+      //The length doesn't include the actual length byte
+      int len = buffer.position() - DataConstants.SIZE_INT;
+      buffer.putInt(0, len);
+      buffer.flip();
+      outputHandler.handle(buffer);
+   }
+
+   private void assembleMulti(MessagingBuffer buffer, DecoderHandler decoderHandler, long id, int length)
+         throws Exception
+   {
+      boolean lastPacket = buffer.getBoolean();
+      int packetId = buffer.getInt();
+      int correlationId = buffer.getInt();
+
+      if(lastPacket)
+      {
+         PacketFragmentImpl fragment = new PacketFragmentImpl(packetId, lastPacket, correlationId, length, buffer);
+         getFragments(id, fragment.getPacketId()).add(fragment);
+         List<PacketFragment> fragments = allFragments.get(id).remove(fragment.getPacketId());
+         PacketFragmentBuffer buff = new PacketFragmentBufferImpl(fragments, packetFragmentSize);
+         Packet packet = messagingCodec.decode(buff);
+         decoderHandler.handle(packet);
+      }
+      else
+      {
+         //if we are part of a multi part packet then we need to be cached for later use
+         PacketFragmentImpl fragment = new PacketFragmentImpl(packetId, lastPacket, correlationId, length, messageCache,id);
+         getFragments(id, fragment.getPacketId()).add(fragment);
+         messageCache.cache(buffer, length - 9, id, packetId);
+      }
+   }
+
+   private void assembleSingle(MessagingBuffer buffer, DecoderHandler decoderHandler, long id, int length)
+         throws Exception
+   {
+      Packet packet = messagingCodec.decode(buffer);
+      decoderHandler.handle(packet);   
+   }
+}

Added: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/PacketFragmentBufferImpl.java
===================================================================
--- branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/PacketFragmentBufferImpl.java	                        (rev 0)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/PacketFragmentBufferImpl.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -0,0 +1,558 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting.impl;
+
+import org.jboss.messaging.util.DataConstants;
+import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.util.MessagingBuffer;
+import static org.jboss.messaging.util.DataConstants.NULL;
+import static org.jboss.messaging.util.DataConstants.NOT_NULL;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketFragmentBuffer;
+import org.jboss.messaging.core.remoting.PacketFragment;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+
+/**
+ * A MessagingBuffer that can be used to break up the buffers into smaller pieces.
+ *
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class PacketFragmentBufferImpl implements PacketFragmentBuffer
+{
+
+   private static final Charset utf8 = Charset.forName("UTF-8");
+
+   private List<PacketFragment> packetFragments;
+   private int currentPos = 0;
+   private int size = 0;
+   private int correlationId = 0;
+   private int packetFragmentSize;
+
+   public PacketFragmentBufferImpl(int packetId, MessagingBuffer buffer, int packetFragmentSize)
+   {
+      packetFragments = new ArrayList<PacketFragment>();
+      packetFragments.add(new PacketFragmentImpl(packetId, false, correlationId++, buffer));
+      putInt(-1);
+      putBoolean(false);
+      putInt(getCurrentFragment().getPacketId());
+      putInt(getCurrentFragment().getCorrelationId());
+      getCurrentFragment().setDataStartPosition(getCurrentBuffer().position());
+      this.packetFragmentSize = packetFragmentSize;
+   }
+
+   public PacketFragmentBufferImpl(List<PacketFragment> packetFragments, int packetFragmentSize)
+   {
+      if(packetFragments == null || packetFragments.size() == 0)
+      {
+         throw new IllegalArgumentException("packet fragments must not be null or empty");
+      }
+      this.packetFragments = packetFragments;
+      currentPos = 0;
+      this.packetFragmentSize = packetFragmentSize;
+      getCurrentFragment().reset();
+   }
+
+   public void putFloat(float val)
+   {
+      checkBufferSpace(DataConstants.SIZE_FLOAT);
+      getCurrentBuffer().putFloat(val);
+   }
+
+   public void putInt(int i)
+   {
+      checkBufferSpace(DataConstants.SIZE_INT);
+      getCurrentBuffer().putInt(i);
+   }
+
+   public void putByte(byte b)
+   {
+      checkBufferSpace(DataConstants.SIZE_BYTE);
+      getCurrentBuffer().putByte(b);
+   }
+
+   public void putLong(long l)
+   {
+      checkBufferSpace(DataConstants.SIZE_LONG);
+      getCurrentBuffer().putLong(l);
+   }
+
+   public void putBoolean(boolean b)
+   {
+      checkBufferSpace(DataConstants.SIZE_BOOLEAN);
+      getCurrentBuffer().putBoolean(b);
+   }
+
+   public void putNullableString(String nullableString)
+   {
+      if (nullableString == null)
+		{
+			putByte(NULL);
+		}
+		else
+		{
+			putByte(NOT_NULL);
+
+			putString(nullableString);
+		}
+   }
+
+   public void putSimpleString(SimpleString string)
+   {
+      byte[] data = string.getData();
+
+   	putInt(data.length);
+   	putBytes(data);
+   }
+
+   public void putNullableSimpleString(SimpleString string)
+   {
+      if (string == null)
+   	{
+   		putByte(NULL);
+   	}
+   	else
+   	{
+   	   putByte(NOT_NULL);
+   		putSimpleString(string);
+   	}
+   }
+
+   public void putBytes(byte[] bytes, int i, int i1)
+   {
+      if(hasWriteSpace(i1 - i))
+      {
+         getCurrentBuffer().putBytes(bytes, i, i1);
+      }
+      else
+      {
+         int size = bytes.length;
+         int written = 0;
+         while(written < i1)
+         {
+            int left = getCurrentBuffer().limit() - getCurrentBuffer().position();
+            int towrite = left + written > i1?i1 - written:left;
+            getCurrentBuffer().putBytes(bytes, i, towrite);
+            written+=towrite;
+            i += left;
+            if(written < i1)
+            {
+               setNextFragment();
+            }
+         }
+
+      }
+   }
+
+   public void putBytes(byte[] bytes)
+   {
+      putBytes(bytes, 0, bytes.length);
+   }
+
+   public void putShort(short val)
+   {
+      checkBufferSpace(DataConstants.SIZE_SHORT);
+      getCurrentBuffer().putShort(val);
+   }
+
+   public void putDouble(double val)
+   {
+      checkBufferSpace(DataConstants.SIZE_DOUBLE);
+      getCurrentBuffer().putDouble(val);
+   }
+
+   public void putChar(char val)
+   {
+      checkBufferSpace(DataConstants.SIZE_CHAR);
+      getCurrentBuffer().putChar(val);
+   }
+
+   public void putUTF(String str)
+   {
+      //TODO This is quite inefficient - can be improved using a method similar to what MINA IOBuffer does
+		//(putPrefixedString)
+		ByteBuffer bb = utf8.encode(str);
+   	putInt(bb.limit() - bb.position());
+   	putBytes(bb.array());
+   }
+
+   public void putBoolean(int pos, boolean val)
+   {
+      int bufferPos = getBufferPosition(pos);
+      int actualPos = pos - getPreviousBuffersSize(bufferPos);
+      packetFragments.get(bufferPos).getMessagingBuffer().putBoolean(actualPos, val);
+   }
+
+   public void putInt(int pos, int val)
+   {
+      int bufferPos = getBufferPosition(pos);
+      int actualPos = pos - getPreviousBuffersSize(bufferPos) + getCurrentFragment().getHeaderSize() + DataConstants.SIZE_INT;
+      packetFragments.get(bufferPos).getMessagingBuffer().putInt(actualPos, val);
+   }
+
+
+   public void putString(String string)
+   {
+      putInt(string.length());
+
+		for (int i = 0; i < string.length(); i++)
+		{
+			putChar(string.charAt(i));
+		}
+   }
+
+   public byte[] array()
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public int capacity()
+   {
+      return 0;
+   }
+
+   public void limit(int limit)
+   {
+
+   }
+
+   public void position(int position)
+   {
+      int bufferPos = getBufferPosition(position);
+      int actualPos = position - getPreviousBuffersSize(bufferPos);
+      currentPos = bufferPos;
+      getCurrentBuffer().position(actualPos);
+   }
+
+   public int position()
+   {
+      return getPreviousBuffersSize() + (getCurrentBuffer().position() - getCurrentFragment().getHeaderSize() - DataConstants.SIZE_INT);
+   }
+
+   public MessagingBuffer slice()
+   {
+      return null;
+   }
+
+   public MessagingBuffer createNewBuffer(int len)
+   {
+      return new PacketFragmentBufferImpl(getCurrentFragment().getPacketId(),getCurrentBuffer().createNewBuffer(len), packetFragmentSize);
+   }
+
+
+   public String getString()
+   {
+      int len = getInt();
+
+   	char[] chars = new char[len];
+
+      for (int i = 0; i < len; i++)
+      {
+      	chars[i] = getChar();
+      }
+
+      return new String(chars);
+   }
+
+   public void getBytes(byte[] value, int i, int read)
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public String getUTF() throws Exception
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public void flip()
+   {
+      for (int i = 0; i < packetFragments.size(); i++)
+      {
+         packetFragments.get(i).getMessagingBuffer().flip();
+      }
+   }
+
+   public void rewind()
+   {
+      for (int i = 0; i < packetFragments.size(); i++)
+      {
+         packetFragments.get(i).getMessagingBuffer().rewind();
+      }
+      currentPos = 0;
+   }
+
+
+   public int getInt()
+   {
+      checkReadSize(DataConstants.SIZE_INT);
+      return getCurrentBuffer().getInt();
+   }
+
+   public long getLong()
+   {
+      checkReadSize(DataConstants.SIZE_LONG);
+      return getCurrentBuffer().getLong();
+   }
+
+   public short getShort()
+   {
+      checkReadSize(DataConstants.SIZE_SHORT);
+      return getCurrentBuffer().getShort();
+   }
+
+   public boolean getBoolean()
+   {
+      checkReadSize(DataConstants.SIZE_BOOLEAN);
+      return getCurrentBuffer().getBoolean();
+   }
+
+   public String getNullableString()
+   {
+      byte check = getByte();
+
+		if (check == NULL)
+		{
+			return null;
+		}
+		else
+		{
+			return getString();
+		}
+   }
+
+   public SimpleString getSimpleString()
+   {
+      int len = getInt();
+
+   	byte[] data = new byte[len];
+   	getBytes(data);
+
+   	return new SimpleString(data);
+   }
+
+    public SimpleString getNullableSimpleString()
+   {
+      int b = getByte();
+   	if (b == NULL)
+   	{
+   	   return null;
+   	}
+   	else
+   	{
+         return getSimpleString();
+   	}
+   }
+
+   public byte getByte()
+   {
+      checkReadSize(DataConstants.SIZE_BYTE);
+      return getCurrentBuffer().getByte();
+   }
+
+   public void getBytes(byte[] data)
+   {
+      int left = getCurrentFragment().getEndPosition() -  getCurrentBuffer().position();
+      if(data.length <= left)
+      {
+         getCurrentBuffer().getBytes(data);
+      }
+      else
+      {
+         int size = data.length;
+         int read = 0;
+         while (read < size)
+         {
+            left = getCurrentFragment().getEndPosition() -  getCurrentBuffer().position();
+
+            getCurrentBuffer().getBytes(data, read, left);
+            read += left;
+            if(read < size)
+            {
+               currentPos++;
+               getCurrentFragment().reset();
+            }
+
+         }
+
+      }
+   }
+
+
+   public float getFloat()
+   {
+      checkReadSize(DataConstants.SIZE_FLOAT);
+      return getCurrentBuffer().getFloat();
+   }
+
+   public double getDouble()
+   {
+      checkReadSize(DataConstants.SIZE_DOUBLE);
+      return getCurrentBuffer().getDouble();
+   }
+
+   public char getChar()
+   {
+      checkReadSize(DataConstants.SIZE_CHAR);
+      return getCurrentBuffer().getChar();
+   }
+
+   public List<PacketFragment> getPacketFragments()
+   {
+      return packetFragments;
+   }
+
+   public void prepare()
+   {
+      for (int i = 0; i < packetFragments.size(); i++)
+      {
+         PacketFragment packetFragment = packetFragments.get(i);
+         int len = packetFragment.getMessagingBuffer().position() - packetFragment.getDataStartPosition() + getCurrentFragment().getHeaderSize();
+         packetFragment.getMessagingBuffer().putInt(0, len);
+         if(i == packetFragments.size() - 1)
+         {
+            packetFragment.getMessagingBuffer().putBoolean(DataConstants.SIZE_INT, true);
+         }
+         packetFragment.getMessagingBuffer().flip();
+      }
+   }
+
+   public short getUnsignedByte()
+   {
+      checkReadSize(DataConstants.SIZE_SHORT);
+      return getCurrentBuffer().getUnsignedByte();
+   }
+
+   public int getUnsignedShort()
+   {
+     checkReadSize(DataConstants.SIZE_INT);
+      return getCurrentBuffer().getUnsignedByte();
+   }
+
+   public int remaining()
+   {
+      return getPostBuffersSize() + getCurrentBuffer().remaining();
+   }
+
+   public int limit()
+   {
+      return getPreviousBuffersSize() + getCurrentBuffer().limit();
+   }
+
+   // Private -------------------------------------------------------
+
+   private void checkReadSize(int size)
+   {
+      if(!hasReadSpace(size))
+      {
+         currentPos++;
+         getCurrentFragment().reset();
+      }
+   }
+
+   private void checkBufferSpace(int size)
+   {
+      if(!hasWriteSpace(size))
+      {
+         setNextFragment();
+      }
+   }
+
+   private void setNextFragment()
+   {
+      getCurrentFragment().setLength(getCurrentBuffer().position());
+      packetFragments.add(new PacketFragmentImpl(getCurrentFragment().getPacketId(), false,  getCurrentFragment().getCorrelationId() + 1,
+            getCurrentBuffer().createNewBuffer(packetFragmentSize)));
+      currentPos++;
+      putInt(-1);
+      putBoolean(false);
+      putInt(getCurrentFragment().getPacketId());
+      putInt(getCurrentFragment().getCorrelationId());
+      getCurrentFragment().setDataStartPosition(getCurrentBuffer().position());
+   }
+
+   private boolean hasWriteSpace(int size)
+   {
+      int left = getCurrentBuffer().limit() - getCurrentBuffer().position();
+      return size<=left;
+   }
+
+   private boolean hasReadSpace(int size)
+   {
+      return getCurrentBuffer().position() + size <= getCurrentFragment().getEndPosition();
+   }
+
+   private PacketFragment getCurrentFragment()
+   {
+      return packetFragments.get(currentPos);
+   }
+
+   private MessagingBuffer getCurrentBuffer()
+   {
+      return getCurrentFragment().getMessagingBuffer();
+   }
+
+   private int getPreviousBuffersSize()
+   {
+      return getPreviousBuffersSize(currentPos);
+   }
+
+   private int getPostBuffersSize()
+   {
+      return getPostBuffersSize(currentPos);
+   }
+
+    private int getPreviousBuffersSize(int currentPos)
+   {
+      int ret = 0;
+      for(int i = 0; i < currentPos; i++)
+      {
+         ret+=packetFragments.get(i).getLength() - getCurrentFragment().getHeaderSize() - DataConstants.SIZE_INT;
+      }
+      return ret;
+   }
+
+   private int getPostBuffersSize(int currentPos)
+   {
+      int ret = 0;
+      for(int i = currentPos + 1; i < packetFragments.size(); i++)
+      {
+         ret+=packetFragments.get(i).getLength();
+      }
+      return ret;
+   }
+
+   private int getBufferPosition(int position)
+   {
+      int pos = 0;
+      for (int i = 0; i < packetFragments.size(); i++)
+      {
+         int end = packetFragments.get(i).getLength();
+         if(position > pos && position < end)
+         {
+            return i;
+         }
+      }
+      return pos;
+   }
+}

Added: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/PacketFragmentImpl.java
===================================================================
--- branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/PacketFragmentImpl.java	                        (rev 0)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/PacketFragmentImpl.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -0,0 +1,149 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting.impl;
+
+import org.jboss.messaging.util.MessagingBuffer;
+import org.jboss.messaging.util.DataConstants;
+import org.jboss.messaging.core.remoting.PacketFragment;
+import org.jboss.messaging.core.remoting.MessageCache;
+import org.jboss.messaging.core.remoting.impl.mina.IoBufferWrapper;
+
+/**
+ * A fragment of a full packet that holds a portion of a full message within a buffer.
+ * 
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class PacketFragmentImpl implements PacketFragment
+{
+   private int length;
+   private int packetId;
+   private int correlationId;
+   private boolean lastPacket;
+   private long sessionId;
+   private int dataStartPosition = -1;
+   private MessagingBuffer messagingBuffer;
+   private MessageCache messageCache;
+
+   public PacketFragmentImpl(int packetId, boolean lastPacket, int correlationId, int length, MessageCache messageCache, long id)
+   {
+      this.packetId = packetId;
+      this.lastPacket = lastPacket;
+      this.correlationId = correlationId;
+      this.messageCache = messageCache;
+      this.length = length;
+      this.sessionId = id;
+      dataStartPosition = 0;
+   }
+
+    public PacketFragmentImpl(int packetId, boolean lastPacket, int correlationId, int length, MessagingBuffer messagingBuffer)
+   {
+      this.packetId = packetId;
+      this.lastPacket = lastPacket;
+      this.correlationId = correlationId;
+      this.length = length;
+      this.messagingBuffer = messagingBuffer;
+      dataStartPosition = messagingBuffer.position();
+   }
+
+   public PacketFragmentImpl(int packetId, boolean lastPacket, int correlationId, MessagingBuffer messagingBuffer)
+   {
+      this.packetId = packetId;
+      this.lastPacket = lastPacket;
+      this.correlationId = correlationId;
+      this.messagingBuffer = messagingBuffer;
+   }
+
+   public int getHeaderSize()
+   {
+      return (DataConstants.SIZE_INT * 2) + DataConstants.SIZE_BOOLEAN;
+   }
+   public int getPacketId()
+   {
+      return packetId;
+   }
+
+   public void setPacketId(int packetId)
+   {
+      this.packetId = packetId;
+   }
+
+   public void setCorrelationId(int correlationId)
+   {
+      this.correlationId = correlationId;
+   }
+
+   public void setLastPacket(boolean lastPacket)
+   {
+      this.lastPacket = lastPacket;
+   }
+
+   public int getCorrelationId()
+   {
+      return correlationId;
+   }
+
+   public int getLength()
+   {
+      return length;
+   }
+
+   public void setLength(int i)
+   {
+      length = i;
+   }
+
+   public boolean isLastPacket()
+   {
+      return lastPacket;
+   }
+
+   public MessagingBuffer getMessagingBuffer()
+   {
+      if(messagingBuffer == null)
+      {
+         messagingBuffer = messageCache.retrieve(length - 9, sessionId, packetId, correlationId);
+      }
+      return messagingBuffer;
+   }
+
+   public int getDataStartPosition()
+   {
+      return dataStartPosition;
+   }
+
+   public void setDataStartPosition(int dataStartPosition)
+   {
+      this.dataStartPosition = dataStartPosition;
+   }
+
+   public int getEndPosition()
+   {
+      return getDataStartPosition() + getLength() - getHeaderSize();
+   }
+   public void reset()
+   {
+      if(messagingBuffer != null)
+      {
+         messagingBuffer.position(dataStartPosition);
+      }
+   }
+}

Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java	2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -23,6 +23,7 @@
 package org.jboss.messaging.core.remoting.impl.mina;
 
 import org.apache.mina.common.DefaultIoFilterChainBuilder;
+import org.jboss.messaging.core.remoting.MessageCache;
 
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -34,7 +35,7 @@
  */
 public interface FilterChainSupport
 {
-   void addCodecFilter(final DefaultIoFilterChainBuilder filterChain);
+   void addCodecFilter(final DefaultIoFilterChainBuilder filterChain, int initialPacketFragmentSize, int packetFragmentSize, MessageCache messageCache);
 
    void addSSLFilter(
          final DefaultIoFilterChainBuilder filterChain, final boolean client,

Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupportImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupportImpl.java	2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupportImpl.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -30,6 +30,8 @@
 import org.apache.mina.filter.ssl.SslFilter;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.impl.ssl.SSLSupport;
+import org.jboss.messaging.core.remoting.impl.InMemoryMessageCache;
+import org.jboss.messaging.core.remoting.MessageCache;
 
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -50,11 +52,11 @@
 
    // Public --------------------------------------------------------
 
-   public void addCodecFilter(final DefaultIoFilterChainBuilder filterChain)
+   public void addCodecFilter(final DefaultIoFilterChainBuilder filterChain, int initialPacketFragmentSize, int packetFragmentSize, MessageCache messageCache)
    {
       assert filterChain != null;
 
-      filterChain.addLast("codec", new ProtocolCodecFilter(new MinaProtocolCodecFilter()));
+      filterChain.addLast("codec", new ProtocolCodecFilter(new MinaProtocolCodecFilter(initialPacketFragmentSize, packetFragmentSize, messageCache)));
    }
 
    public void addSSLFilter(

Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java	2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -68,6 +68,11 @@
    
    // Public --------------------------------------------------------
 
+   public IoBuffer getBuffer()
+   {
+      return buffer;
+   }
+
    // MessagingBuffer implementation ----------------------------------------------
 
    public byte[] array()
@@ -137,7 +142,14 @@
    
    public void putBytes(final byte[] bytes, int offset, int length)
    {
-      buffer.put(bytes, offset, length);
+      try
+      {
+         buffer.put(bytes, offset, length);
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+      }
    }
 
    public void putInt(final int intValue)
@@ -241,6 +253,17 @@
       }
    }
 
+   public void putBoolean(int pos, boolean val)
+   {
+      if (val)
+      {
+         buffer.put(pos, TRUE);
+      } else
+      {
+         buffer.put(pos, FALSE);
+      }
+   }
+
    public boolean getBoolean()
    {
       byte b = buffer.get();

Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaAcceptor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaAcceptor.java	2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaAcceptor.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -30,6 +30,8 @@
 import org.jboss.messaging.core.remoting.Acceptor;
 import org.jboss.messaging.core.remoting.CleanUpNotifier;
 import org.jboss.messaging.core.remoting.RemotingService;
+import org.jboss.messaging.core.remoting.MessageCache;
+import org.jboss.messaging.core.remoting.impl.InMemoryMessageCache;
 
 import java.net.InetSocketAddress;
 import java.util.concurrent.ExecutorService;
@@ -50,6 +52,7 @@
    private CleanUpNotifier cleanupNotifier;
    private RemotingService remotingService;
    private FilterChainSupport chainSupport = new FilterChainSupportImpl();
+   private MessageCache messageCache = new InMemoryMessageCache();
 
    public void startAccepting(RemotingService remotingService, CleanUpNotifier cleanupNotifier) throws Exception
    {
@@ -73,7 +76,8 @@
                  .getTrustStorePath(), remotingService.getConfiguration()
                  .getTrustStorePassword());
       }
-      chainSupport.addCodecFilter(filterChain);
+      chainSupport.addCodecFilter(filterChain, remotingService.getConfiguration().getInitialPacketFragmentSize(),
+            remotingService.getConfiguration().getPacketFragmentSize(), messageCache);
 
       // Bind
       acceptor.setDefaultLocalAddress(new InetSocketAddress(remotingService.getConfiguration().getHost(), remotingService.getConfiguration().getPort()));
@@ -172,6 +176,7 @@
       {
          remotingService.unregisterPinger(session.getId());
          cleanupNotifier.fireCleanup(session.getId(), null);
+         messageCache.clear(session.getId());
       }
    }
 

Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java	2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -51,16 +51,9 @@
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.ping.Pinger;
 import org.jboss.messaging.core.ping.impl.PingerImpl;
-import org.jboss.messaging.core.remoting.CleanUpNotifier;
-import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.PacketDispatcher;
-import org.jboss.messaging.core.remoting.PacketHandler;
-import org.jboss.messaging.core.remoting.PacketReturner;
-import org.jboss.messaging.core.remoting.RemotingConnector;
-import org.jboss.messaging.core.remoting.RemotingSession;
-import org.jboss.messaging.core.remoting.ResponseHandler;
-import org.jboss.messaging.core.remoting.TransportType;
+import org.jboss.messaging.core.remoting.*;
 import org.jboss.messaging.core.remoting.impl.ResponseHandlerImpl;
+import org.jboss.messaging.core.remoting.impl.InMemoryMessageCache;
 import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
 import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
 import org.jboss.messaging.util.MessagingBuffer;
@@ -106,6 +99,8 @@
 
    private boolean alive = true;
 
+   private MessageCache messageCache;
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -156,6 +151,7 @@
       this.connector = connector;
       this.connectorConfig = connector.getSessionConfig();
       this.chainSupport = chainSupport;
+      messageCache = new InMemoryMessageCache();
       
       
       DefaultIoFilterChainBuilder filterChain = connector.getFilterChain();
@@ -176,7 +172,8 @@
             throw ise;
          }
       }
-      this.chainSupport.addCodecFilter(filterChain);
+      this.chainSupport.addCodecFilter(filterChain, connectionParams.getInitialPacketFragmentSize(),
+            connectionParams.getPacketFragmentSize(), messageCache);
       connectorConfig.setTcpNoDelay(connectionParams.isTcpNoDelay());
       int receiveBufferSize = connectionParams.getTcpReceiveBufferSize();
       if (receiveBufferSize != -1)
@@ -353,7 +350,7 @@
       {
          listener.sessionDestroyed(sessionID, me); 
       }
-
+      messageCache.clear(sessionID);
       session = null;
       connector = null;
    }

Added: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaDecoderHandler.java
===================================================================
--- branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaDecoderHandler.java	                        (rev 0)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaDecoderHandler.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -0,0 +1,44 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting.impl.mina;
+
+import org.jboss.messaging.core.remoting.DecoderHandler;
+import org.jboss.messaging.core.remoting.Packet;
+import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class MinaDecoderHandler implements DecoderHandler
+{
+   private ProtocolDecoderOutput out;
+
+   public MinaDecoderHandler(final ProtocolDecoderOutput out)
+   {
+      this.out = out;
+   }
+
+   public void handle(Packet packet)
+   {
+      out.write(packet);
+   }
+}

Added: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaEncoderHandler.java
===================================================================
--- branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaEncoderHandler.java	                        (rev 0)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaEncoderHandler.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -0,0 +1,44 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting.impl.mina;
+
+import org.jboss.messaging.util.MessagingBuffer;
+import org.jboss.messaging.core.remoting.EncoderHandler;
+import org.apache.mina.filter.codec.ProtocolEncoderOutput;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class MinaEncoderHandler implements EncoderHandler
+{
+   private ProtocolEncoderOutput out;
+
+   public MinaEncoderHandler(ProtocolEncoderOutput output)
+   {
+      this.out = output;
+   }
+   public void handle(MessagingBuffer buffer)
+   {
+      IoBufferWrapper ioBufferWrapper = (IoBufferWrapper) buffer;
+      out.write(ioBufferWrapper.getBuffer());
+   }
+}

Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaProtocolCodecFilter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaProtocolCodecFilter.java	2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaProtocolCodecFilter.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -26,11 +26,14 @@
 import org.apache.mina.common.IoSession;
 import org.apache.mina.filter.codec.*;
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.MessagingCodec;
-import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.*;
 import org.jboss.messaging.core.remoting.impl.MessagingCodecImpl;
+import org.jboss.messaging.core.remoting.impl.PacketAssemblerImpl;
 import org.jboss.messaging.util.MessagingBuffer;
+import static org.jboss.messaging.util.DataConstants.SIZE_INT;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
 /**
  * A Mina ProtocolEncoder used to encode/decode messages.
  * 
@@ -44,8 +47,19 @@
 
    // ProtocolCodecFactory implementation
    // -----------------------------------------------------------------------------------
-   private MessagingCodec messagingCodec = new MessagingCodecImpl();
 
+   private PacketAssembler packetAssembler;
+
+   private AtomicInteger packetId = new AtomicInteger(0);
+   private int initialPacketFragmentSize;
+
+   public MinaProtocolCodecFilter(int initialPacketFragmentSize, int packetFragmentSize, MessageCache messageCache)
+   {
+      super();
+      this.initialPacketFragmentSize = initialPacketFragmentSize;
+      packetAssembler = new PacketAssemblerImpl(packetFragmentSize, messageCache);
+   }
+
    public ProtocolDecoder getDecoder(final IoSession session)
    {
       return this;
@@ -66,15 +80,11 @@
                       final ProtocolEncoderOutput out) throws Exception
    {
 
-      IoBuffer iobuf = IoBuffer.allocate(1024, false);
-
+      IoBuffer iobuf = IoBuffer.allocate(initialPacketFragmentSize, false);
       iobuf.setAutoExpand(true);
-
       MessagingBuffer buffer = new IoBufferWrapper(iobuf);
-
-      messagingCodec.encode(buffer, message);
-
-      out.write(iobuf);
+      EncoderHandler outputHandler = new MinaEncoderHandler(out);
+      packetAssembler.disAssemble(buffer, outputHandler, message);
    }
 
    // CumulativeProtocolDecoder overrides
@@ -83,13 +93,8 @@
    public boolean doDecode(final IoSession session, final IoBuffer in, final ProtocolDecoderOutput out) throws Exception
    {
       MessagingBuffer buff = new IoBufferWrapper(in);
-
-      Packet packet = messagingCodec.decode(buff);
-      if(packet != null)
-      {
-         out.write(packet);
-      }
-      return packet != null;
+      DecoderHandler decoderHandler = new MinaDecoderHandler(out);
+      return packetAssembler.assemble(buff, decoderHandler, session.getId());
    }
 }
 

Added: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketFragmentBuffer.java
===================================================================
--- branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketFragmentBuffer.java	                        (rev 0)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketFragmentBuffer.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -0,0 +1,17 @@
+package org.jboss.messaging.core.remoting.impl.wireformat;
+
+import org.jboss.messaging.util.MessagingBuffer;
+import org.jboss.messaging.core.remoting.PacketFragment;
+
+import javax.transaction.xa.Xid;
+import java.util.List;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public interface PacketFragmentBuffer extends MessagingBuffer
+{
+   List<PacketFragment> getPacketFragments();
+
+   void prepare();
+}

Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -25,7 +25,6 @@
 
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.util.DataConstants;
 import org.jboss.messaging.util.MessagingBuffer;
 
 /**
@@ -173,10 +172,9 @@
    }
 
    public void encode(MessagingBuffer buffer)
-   {      
+   {
       //The standard header fields
-      buffer.putInt(0); //The length gets filled in at the end
-      buffer.putByte(type); 
+      buffer.putByte(type);
       buffer.putLong(responseTargetID);
       buffer.putLong(targetID);
       buffer.putLong(executorID);
@@ -184,11 +182,11 @@
       encodeBody(buffer);
       
       //The length doesn't include the actual length byte
-      int len = buffer.position() - DataConstants.SIZE_INT;
+      /*int len = bufferEncoder.position() - DataConstants.SIZE_INT;
       
-      buffer.putInt(0, len);
+      bufferEncoder.putInt(0, len);
       
-      buffer.flip();
+      buffer.flip();*/
    }
 
    public void decode(final MessagingBuffer buffer) throws Exception
@@ -241,4 +239,5 @@
 
    // Inner classes -------------------------------------------------
 
+
 }

Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerSendMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerSendMessage.java	2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerSendMessage.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -89,13 +89,9 @@
    
    public void decodeBody(final MessagingBuffer buffer)
    {
-      //TODO can be optimised
-      
       serverMessage = new ServerMessageImpl();
       
       serverMessage.decode(buffer);
-      
-      serverMessage.getBody().flip();
    }
 
 

Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReceiveMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReceiveMessage.java	2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReceiveMessage.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -116,8 +116,7 @@
       clientMessage = new ClientMessageImpl(deliveryCount, deliveryID);
       
       clientMessage.decode(buffer);
-      
-      clientMessage.getBody().flip();
+
    }
 
    // Package protected ---------------------------------------------

Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/jms/client/JBossBytesMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossBytesMessage.java	2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/jms/client/JBossBytesMessage.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -25,7 +25,7 @@
 import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.client.ClientSession;
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.impl.mina.IoBufferWrapper;
+import org.jboss.messaging.util.MessagingBuffer;
 
 import javax.jms.BytesMessage;
 import javax.jms.JMSException;
@@ -415,7 +415,7 @@
    {
       super.clearBody();
 
-      body = new IoBufferWrapper(1024);
+      body = body.createNewBuffer(1024);
    }
 
    public long getBodyLength() throws JMSException
@@ -425,11 +425,11 @@
       return body.limit();
    }
 
-   public void doBeforeSend() throws Exception
+   public void encode(MessagingBuffer buffer)
    {
-      reset();
-
-      message.setBody(body);
+      super.encode(buffer);
+      body.rewind();
+      buffer.putBytes(body.array(), 0, body.limit());
    }
 
    // Public --------------------------------------------------------

Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/jms/client/JBossMapMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossMapMessage.java	2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/jms/client/JBossMapMessage.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -24,8 +24,10 @@
 
 import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketFragmentBuffer;
 import org.jboss.messaging.util.SimpleString;
 import org.jboss.messaging.util.TypedProperties;
+import org.jboss.messaging.util.MessagingBuffer;
 
 import javax.jms.JMSException;
 import javax.jms.MapMessage;
@@ -439,12 +441,10 @@
       
       map.clear();
    }
-   
-   public void doBeforeSend() throws Exception
+
+   public void encode(MessagingBuffer buff)
    {
-      map.encode(body);
-      
-      super.doBeforeSend();
+      map.encode(buff);
    }
    
    public void doBeforeReceive() throws Exception

Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/jms/client/JBossMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossMessage.java	2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/jms/client/JBossMessage.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -27,6 +27,7 @@
 import org.jboss.messaging.core.client.impl.ClientMessageImpl;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.message.MessageBodyEncoder;
 import org.jboss.messaging.jms.JBossDestination;
 import org.jboss.messaging.util.ByteBufferWrapper;
 import org.jboss.messaging.util.MessagingBuffer;
@@ -55,7 +56,7 @@
  *
  * $Id: JBossMessage.java 3466 2007-12-10 18:44:52Z timfox $
  */
-public class JBossMessage implements javax.jms.Message
+public class JBossMessage implements javax.jms.Message, MessageBodyEncoder
 {
    // Constants -----------------------------------------------------
 
@@ -187,7 +188,7 @@
    protected JBossMessage(final byte type, final ClientSession session)
    {
       message = session.createClientMessage(type, true, 0, System.currentTimeMillis(), (byte)4);
-      
+      message.setMessageBodyEncoder(this);
       //TODO - can we lazily create this?
       body = message.getBody();
    }
@@ -207,6 +208,8 @@
       this.readOnly = true;
       
       this.session = session;
+
+      message.setMessageBodyEncoder(this);
       
       this.body = message.getBody();
    }
@@ -876,13 +879,16 @@
    {
       return message;
    }
-   
-   public void doBeforeSend() throws Exception
+
+   public void encode(MessagingBuffer buffer)
    {
-      body.flip();
-      
-      message.setBody(body);
+      //do nothing
    }
+
+   public void decode(MessagingBuffer buffer, int len)
+   {
+      //To change body of implemented methods use File | Settings | File Templates.
+   }
    
    public void doBeforeReceive() throws Exception
    {

Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/jms/client/JBossMessageProducer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossMessageProducer.java	2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/jms/client/JBossMessageProducer.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -412,19 +412,6 @@
 
       jbm.setJMSDestination(destination);
 
-      try
-      {
-         jbm.doBeforeSend();
-      }
-      catch (Exception e)
-      {
-         JMSException je = new JMSException(e.getMessage());
-         
-         je.initCause(e);
-         
-         throw je;
-      }
-
       JBossDestination dest = (JBossDestination)destination;
 
       try

Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/jms/client/JBossObjectMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossObjectMessage.java	2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/jms/client/JBossObjectMessage.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -25,6 +25,7 @@
 import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.client.ClientSession;
 import org.jboss.messaging.util.ObjectInputStreamWithClassLoader;
+import org.jboss.messaging.util.MessagingBuffer;
 
 import javax.jms.JMSException;
 import javax.jms.ObjectMessage;
@@ -93,29 +94,37 @@
    {
       return JBossObjectMessage.TYPE;
    }
-   
-   public void doBeforeSend() throws Exception
+
+   public void encode(MessagingBuffer buffer)
    {
       if (object != null)
       {
          ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
-         
-         ObjectOutputStream oos = new ObjectOutputStream(baos);
-         
-         oos.writeObject(object);
-         
-         oos.flush();
-         
+
+         try
+         {
+            ObjectOutputStream oos = new ObjectOutputStream(baos);
+
+            oos.writeObject(object);
+
+            oos.flush();
+         }
+         catch (IOException e)
+         {
+            throw new RuntimeException("Unable to serialize object");
+         }
+
          byte[] data = baos.toByteArray();
-         
-         body.putInt(data.length);
-         body.putBytes(data);
+
+         buffer.putInt(data.length);
+         buffer.putBytes(data);
       }
-      
-      super.doBeforeSend();
+      else
+      {
+         buffer.putInt(0);
+      }
    }
-   
-      
+
    // ObjectMessage implementation ----------------------------------
 
    public void setObject(Serializable object) throws JMSException

Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/jms/client/JBossStreamMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossStreamMessage.java	2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/jms/client/JBossStreamMessage.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -567,16 +567,9 @@
    {
       super.clearBody();
       
-      body = new IoBufferWrapper(1024);
+      body = body.createNewBuffer(1024);
    }
    
-   public void doBeforeSend() throws Exception
-   {
-      reset();
-      
-      message.setBody(body);
-   }
-   
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/jms/client/JBossTextMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossTextMessage.java	2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/jms/client/JBossTextMessage.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -26,6 +26,8 @@
 import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.client.ClientSession;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.util.MessagingBuffer;
+import org.jboss.messaging.util.DataConstants;
 
 import javax.jms.JMSException;
 import javax.jms.TextMessage;
@@ -121,13 +123,17 @@
    }
 
    // JBossMessage override -----------------------------------------
-   
-   public void doBeforeSend() throws Exception
+
+   public void encode(MessagingBuffer buffer)
    {
-      body.putNullableString(text);      
-      
-      super.doBeforeSend();
+      buffer.putNullableString(text);
    }
+
+   public void decode(MessagingBuffer buffer, int len)
+   {
+      super.decode(buffer, len);
+      text = body.getNullableString();
+   }
    
    public void doBeforeReceive() throws Exception
    {

Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/util/ByteBufferWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/ByteBufferWrapper.java	2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/util/ByteBufferWrapper.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -301,7 +301,18 @@
       }
 	}
 
-	public void putByte(byte val)
+   public void putBoolean(int pos, boolean val)
+   {
+      if (val)
+      {
+         buffer.put(pos, TRUE);
+      } else
+      {
+         buffer.put(pos, FALSE);
+      }
+   }
+
+   public void putByte(byte val)
 	{
 		buffer.put(val);
 	}

Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/util/ExpandingMessagingBuffer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/ExpandingMessagingBuffer.java	2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/util/ExpandingMessagingBuffer.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -211,6 +211,17 @@
       putByte((byte) (val ? -1 : 0));
    }
 
+   public void putBoolean(int pos, boolean val)
+   {
+      if (val)
+      {
+         buf.put(pos, TRUE);
+      } else
+      {
+         buf.put(pos, FALSE);
+      }
+   }
+
    public void putByte(byte val)
    {
       ensureRemaining(1).put(val);

Modified: branches/Branch_Message_Chunking/src/main/org/jboss/messaging/util/MessagingBuffer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/MessagingBuffer.java	2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/src/main/org/jboss/messaging/util/MessagingBuffer.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -50,9 +50,11 @@
 	void putFloat(float val);
 	
 	void putBoolean(boolean val);
-	
+
+   void putBoolean(int pos, boolean val);
+
 	void putChar(char val);
-	
+
 	void putNullableString(String val);
 	
 	void putString(String val);

Modified: branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/FilterChainSupportTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/FilterChainSupportTest.java	2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/FilterChainSupportTest.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -41,6 +41,7 @@
 import org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport;
 import org.jboss.messaging.core.remoting.impl.mina.FilterChainSupportImpl;
 import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
+import org.jboss.messaging.core.remoting.impl.InMemoryMessageCache;
 
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -84,7 +85,7 @@
       
       support.addSSLFilter(acceptor.getFilterChain(), false, keystorePath,
             keystorePassword, trustStorePath, trustStorePassword);
-      support.addCodecFilter(acceptor.getFilterChain());
+      support.addCodecFilter(acceptor.getFilterChain(), 2048, 1024, new InMemoryMessageCache());
       acceptor.setDefaultLocalAddress(address);
 
       final CountDownLatch latch = new CountDownLatch(1);
@@ -103,7 +104,7 @@
       NioSocketConnector connector = new NioSocketConnector();
       support.addSSLFilter(connector.getFilterChain(), true,
             keystorePath, keystorePassword, null, null);
-      support.addCodecFilter(connector.getFilterChain());
+      support.addCodecFilter(connector.getFilterChain(), 2048, 1024, new InMemoryMessageCache());
       connector.setHandler(new IoHandlerAdapter());
       ConnectFuture future = connector.connect(address).awaitUninterruptibly();
       IoSession session = future.getSession();

Modified: branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaSessionTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaSessionTest.java	2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaSessionTest.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -23,6 +23,8 @@
 package org.jboss.messaging.tests.integration.core.remoting.mina;
 
 import org.jboss.messaging.core.client.impl.LocationImpl;
+import org.jboss.messaging.core.client.impl.ConnectionParamsImpl;
+import org.jboss.messaging.core.client.ConnectionParams;
 import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.remoting.PacketDispatcher;
 import org.jboss.messaging.core.remoting.RemotingConnector;
@@ -56,7 +58,10 @@
    @Override
    protected RemotingConnector createNIOConnector(PacketDispatcher dispatcher)
    {
-      return new MinaConnector(new LocationImpl(TCP, "localhost", TestSupport.PORT), dispatcher);
+      ConnectionParams connectionParams = new ConnectionParamsImpl();
+      connectionParams.setPingInterval(0);
+      connectionParams.setCallTimeout(10000000);
+      return new MinaConnector(new LocationImpl(TCP, "localhost", TestSupport.PORT),connectionParams, dispatcher);
    }
 
    @Override

Added: branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/PacketFragmentBufferImplTest.java
===================================================================
--- branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/PacketFragmentBufferImplTest.java	                        (rev 0)
+++ branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/PacketFragmentBufferImplTest.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -0,0 +1,387 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.unit.core.remoting.impl;
+
+import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.tests.util.RandomUtil;
+import org.jboss.messaging.core.remoting.impl.PacketFragmentBufferImpl;
+import org.jboss.messaging.core.remoting.PacketFragment;
+import org.jboss.messaging.util.MessagingBuffer;
+import org.jboss.messaging.util.ByteBufferWrapper;
+import org.jboss.messaging.util.SimpleString;
+import static org.jboss.messaging.util.DataConstants.NOT_NULL;
+
+import java.nio.ByteBuffer;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class PacketFragmentBufferImplTest extends UnitTestCase
+{
+   private MessagingBuffer messagingBuffer;
+   private PacketFragmentBufferImpl buffer;
+
+   protected void setUp() throws Exception
+   {
+      messagingBuffer = new ByteBufferWrapper(ByteBuffer.allocate(1024));
+   }
+
+   public void testConstructor1()
+   {
+      buffer = new PacketFragmentBufferImpl(10, messagingBuffer, 1024);
+      messagingBuffer.rewind();
+      assertEquals(messagingBuffer.getInt(), -1);
+      assertFalse(messagingBuffer.getBoolean());
+      assertEquals(messagingBuffer.getInt(), 10);
+      assertEquals(messagingBuffer.getInt(), 0);
+      assertEquals(buffer.getPacketFragments().size(), 1);
+      assertEquals(buffer.getPacketFragments().get(0).getDataStartPosition(), 13);
+      assertEquals(buffer.getPacketFragments().get(0).getDataStartPosition(), 13);
+   }
+
+   public void testPutFloat()
+   {
+      buffer = new PacketFragmentBufferImpl(10, messagingBuffer, 1024);
+      messagingBuffer.position(1020);
+      buffer.putFloat(55);
+      assertEquals(buffer.getPacketFragments().size(), 1);
+      assertEquals(messagingBuffer.position(), 1024);
+   }
+
+   public void testPutFloatRollsOver()
+   {
+      buffer = new PacketFragmentBufferImpl(10, messagingBuffer, 1024);
+      messagingBuffer.position(1021);
+      buffer.putFloat(55);
+      assertEquals(buffer.getPacketFragments().size(), 2);
+      assertEquals(messagingBuffer.position(), 1021);
+      MessagingBuffer secondBuffer = buffer.getPacketFragments().get(1).getMessagingBuffer();
+      assertEquals(secondBuffer.position(), 17);
+      secondBuffer.position(13);
+      assertEquals(secondBuffer.getFloat(), 55f);
+   }
+
+   public void testPutInt()
+   {
+      buffer = new PacketFragmentBufferImpl(10, messagingBuffer, 1024);
+      messagingBuffer.position(1020);
+      buffer.putInt(55);
+      assertEquals(buffer.getPacketFragments().size(), 1);
+      assertEquals(messagingBuffer.position(), 1024);
+   }
+
+   public void testPutIntRollsOver()
+   {
+      buffer = new PacketFragmentBufferImpl(10, messagingBuffer, 1024);
+      messagingBuffer.position(1021);
+      buffer.putInt(55);
+      assertEquals(buffer.getPacketFragments().size(), 2);
+      assertEquals(messagingBuffer.position(), 1021);
+      MessagingBuffer secondBuffer = buffer.getPacketFragments().get(1).getMessagingBuffer();
+      assertEquals(secondBuffer.position(), 17);
+      secondBuffer.position(13);
+      assertEquals(secondBuffer.getInt(), 55);
+   }
+
+   public void testPutByte()
+   {
+      buffer = new PacketFragmentBufferImpl(10, messagingBuffer, 1024);
+      messagingBuffer.position(1023);
+      buffer.putByte((byte) 55);
+      assertEquals(buffer.getPacketFragments().size(), 1);
+      assertEquals(messagingBuffer.position(), 1024);
+   }
+
+   public void testPutByteRollsOver()
+   {
+      buffer = new PacketFragmentBufferImpl(10, messagingBuffer, 1024);
+      messagingBuffer.position(1024);
+      buffer.putByte((byte) 55);
+      assertEquals(buffer.getPacketFragments().size(), 2);
+      assertEquals(messagingBuffer.position(), 1024);
+      MessagingBuffer secondBuffer = buffer.getPacketFragments().get(1).getMessagingBuffer();
+      assertEquals(secondBuffer.position(), 14);
+      secondBuffer.position(13);
+      assertEquals(secondBuffer.getByte(), (byte) 55);
+   }
+
+   public void testPutLong()
+   {
+      buffer = new PacketFragmentBufferImpl(10, messagingBuffer, 1024);
+      messagingBuffer.position(1016);
+      buffer.putLong(55);
+      assertEquals(buffer.getPacketFragments().size(), 1);
+      assertEquals(messagingBuffer.position(), 1024);
+   }
+
+   public void testPutLongRollsOver()
+   {
+      buffer = new PacketFragmentBufferImpl(10, messagingBuffer, 1024);
+      messagingBuffer.position(1017);
+      buffer.putLong(55);
+      assertEquals(buffer.getPacketFragments().size(), 2);
+      assertEquals(messagingBuffer.position(), 1017);
+      MessagingBuffer secondBuffer = buffer.getPacketFragments().get(1).getMessagingBuffer();
+      assertEquals(secondBuffer.position(), 21);
+      secondBuffer.position(13);
+      assertEquals(secondBuffer.getLong(), 55);
+   }
+
+   public void testPutBoolean()
+   {
+      buffer = new PacketFragmentBufferImpl(10, messagingBuffer, 1024);
+      messagingBuffer.position(1023);
+      buffer.putBoolean(true);
+      assertEquals(buffer.getPacketFragments().size(), 1);
+      assertEquals(messagingBuffer.position(), 1024);
+   }
+
+   public void testPutBooleanRollsOver()
+   {
+      buffer = new PacketFragmentBufferImpl(10, messagingBuffer, 1024);
+      messagingBuffer.position(1024);
+      buffer.putBoolean(true);
+      assertEquals(buffer.getPacketFragments().size(), 2);
+      assertEquals(messagingBuffer.position(), 1024);
+      MessagingBuffer secondBuffer = buffer.getPacketFragments().get(1).getMessagingBuffer();
+      assertEquals(secondBuffer.position(), 14);
+      secondBuffer.position(13);
+      assertEquals(secondBuffer.getBoolean(), true);
+   }
+
+   public void testPutChar()
+   {
+      buffer = new PacketFragmentBufferImpl(10, messagingBuffer, 1024);
+      messagingBuffer.position(1022);
+      buffer.putChar('d');
+      assertEquals(buffer.getPacketFragments().size(), 1);
+      assertEquals(messagingBuffer.position(), 1024);
+   }
+
+   public void testPutCharRollsOver()
+   {
+      buffer = new PacketFragmentBufferImpl(10, messagingBuffer, 1024);
+      messagingBuffer.position(1023);
+      buffer.putChar('d');
+      assertEquals(buffer.getPacketFragments().size(), 2);
+      assertEquals(messagingBuffer.position(), 1023);
+      MessagingBuffer secondBuffer = buffer.getPacketFragments().get(1).getMessagingBuffer();
+      assertEquals(secondBuffer.position(), 15);
+      secondBuffer.position(13);
+      assertEquals(secondBuffer.getChar(), 'd');
+   }
+
+   public void testPutNullableString()
+   {
+      buffer = new PacketFragmentBufferImpl(10, messagingBuffer, 1024);
+      String s = "This is a String that is a 45 characters long";
+      messagingBuffer.position(1024 - 95);//45 * 2 + 4 (int) + 1 (byte)
+      buffer.putNullableString(s);
+      assertEquals(buffer.getPacketFragments().size(), 1);
+      assertEquals(messagingBuffer.position(), 1024);
+   }
+
+    public void testPutNullNullableString()
+   {
+      buffer = new PacketFragmentBufferImpl(10, messagingBuffer, 1024);
+      String s = null;
+      messagingBuffer.position(1023);
+      buffer.putNullableString(s);
+      assertEquals(buffer.getPacketFragments().size(), 1);
+      assertEquals(messagingBuffer.position(), 1024);
+   }
+
+   public void testPutNullableStringSplitAcrossTwoBuffers()
+   {
+      buffer = new PacketFragmentBufferImpl(10, messagingBuffer, 1024);
+      String s = "This is a String that is a 45 characters long";
+      messagingBuffer.position(1024 - 51);//45 * 2 + 4 (int) + 1 (byte)
+      buffer.putNullableString(s);
+      assertEquals(buffer.getPacketFragments().size(), 2);
+      assertEquals(messagingBuffer.position(), 1024);
+      messagingBuffer.position(1024-51);
+      assertEquals(messagingBuffer.getByte(), NOT_NULL);
+      assertEquals(messagingBuffer.getInt(), 45);
+      String newString = "";
+      for(int i = 0; i < 23; i++)
+      {
+         newString+=messagingBuffer.getChar();
+      }
+      assertEquals(newString, "This is a String that i");
+      MessagingBuffer secondBuffer = buffer.getPacketFragments().get(1).getMessagingBuffer();
+      secondBuffer.position(13);
+      newString = "";
+      for(int i = 0; i < 22; i++)
+      {
+         newString+=secondBuffer.getChar();
+      }
+      assertEquals(newString, "s a 45 characters long");
+   }
+
+   public void testPutSimpleString()
+   {
+      buffer = new PacketFragmentBufferImpl(10, messagingBuffer, 1024);
+      SimpleString s = new SimpleString("This is a String that is a 45 characters long");
+      messagingBuffer.position(1024 - s.getData().length - 4);//the length plus an int
+      buffer.putSimpleString(s);
+      assertEquals(buffer.getPacketFragments().size(), 1);
+      assertEquals(messagingBuffer.position(), 1024);
+      messagingBuffer.position(1024 - s.getData().length - 4);
+      assertEquals(messagingBuffer.getSimpleString(), s);
+   }
+
+   public void testPutSimpleStringSplitAcrossBuffers()
+   {
+      buffer = new PacketFragmentBufferImpl(10, messagingBuffer, 1024);
+      SimpleString s = new SimpleString("This is a String that is a 45 characters long");
+      messagingBuffer.position(1024 - 23 - 4);//half the length plus an int
+      buffer.putSimpleString(s);
+      assertEquals(buffer.getPacketFragments().size(), 2);
+      assertEquals(messagingBuffer.position(), 1024);
+      messagingBuffer.position(1024 - 23 - 4);
+      assertEquals(messagingBuffer.getInt(), 45 * 2);
+      for(int i = 0; i < 23; i++)
+      {
+         assertEquals(messagingBuffer.getByte(), s.getData()[i]);
+      }
+      MessagingBuffer secondBuffer = buffer.getPacketFragments().get(1).getMessagingBuffer();
+      secondBuffer.position(13);
+      for(int i = 0; i < 22; i++)
+      {
+         assertEquals(secondBuffer.getByte(), s.getData()[i + 23]);
+      }
+   }
+
+   public void testPutNullableSimpleString()
+   {
+      buffer = new PacketFragmentBufferImpl(10, messagingBuffer, 1024);
+      SimpleString s = new SimpleString("This is a String that is a 45 characters long");
+      messagingBuffer.position(1024 - s.getData().length - 4 - 1);//the length plus an int and byte
+      buffer.putNullableSimpleString(s);
+      assertEquals(buffer.getPacketFragments().size(), 1);
+      assertEquals(messagingBuffer.position(), 1024);
+      messagingBuffer.position(1024 - s.getData().length - 4 - 1);
+      assertEquals(messagingBuffer.getNullableSimpleString(), s);
+   }
+
+    public void testPutNullNullableSimpleString()
+   {
+      buffer = new PacketFragmentBufferImpl(10, messagingBuffer, 1024);
+      SimpleString s = null;
+      messagingBuffer.position(1023);//the length plus an int and byte
+      buffer.putNullableSimpleString(s);
+      assertEquals(buffer.getPacketFragments().size(), 1);
+      assertEquals(messagingBuffer.position(), 1024);
+      messagingBuffer.position(1023);
+      assertEquals(messagingBuffer.getNullableSimpleString(), s);
+   }
+
+   public void testPutBytes()
+   {
+      buffer = new PacketFragmentBufferImpl(10, messagingBuffer, 1024);
+      byte[] bytes = RandomUtil.randomBytes(5000);
+      buffer.putBytes(bytes);
+      assertEquals(buffer.getPacketFragments().size(), 5);
+      byte[] bytes2 = new byte[bytes.length];
+      for (PacketFragment b : buffer.getPacketFragments())
+      {
+         b.getMessagingBuffer().position(13);
+      }
+      for(int i = 13; i < 1024; i++)
+      {
+         assertEquals(messagingBuffer.getByte(), bytes[i-13]);
+      }
+      messagingBuffer = buffer.getPacketFragments().get(1).getMessagingBuffer();
+      for(int i = 1024 + 13; i < 2048; i++)
+      {
+         byte b = messagingBuffer.getByte();
+         assertEquals(b, bytes[i-26]);
+      }
+      messagingBuffer = buffer.getPacketFragments().get(2).getMessagingBuffer();
+      for(int i = 2048 + 13; i < 3072; i++)
+      {
+         byte b = messagingBuffer.getByte();
+         assertEquals(b, bytes[i-39]);
+      }
+      messagingBuffer = buffer.getPacketFragments().get(3).getMessagingBuffer();
+      for(int i = 3072 + 13; i < 4096; i++)
+      {
+         byte b = messagingBuffer.getByte();
+         assertEquals(b, bytes[i-52]);
+      }
+      messagingBuffer = buffer.getPacketFragments().get(4).getMessagingBuffer();
+      for(int i = 4096 + 13; i <= 5000; i++)
+      {
+         byte b = messagingBuffer.getByte();
+         assertEquals(b, bytes[i-65]);
+      }
+   }
+
+   public void testPutBytesOffset()
+   {
+      buffer = new PacketFragmentBufferImpl(10, messagingBuffer, 1024);
+      byte[] bytes = RandomUtil.randomBytes(7000);
+      byte[] middleBytes = new byte[5000];
+      buffer.putBytes(bytes, 1000, 5000);
+      System.arraycopy(bytes, 1000, middleBytes, 0, 5000);
+      assertEquals(buffer.getPacketFragments().size(), 5);
+      byte[] bytes2 = new byte[bytes.length];
+      for (PacketFragment b : buffer.getPacketFragments())
+      {
+         b.getMessagingBuffer().position(13);
+      }
+      for(int i = 13; i < 1024; i++)
+      {
+         assertEquals(messagingBuffer.getByte(), middleBytes[i-13]);
+      }
+      messagingBuffer = buffer.getPacketFragments().get(1).getMessagingBuffer();
+      for(int i = 1024 + 13; i < 2048; i++)
+      {
+         byte b = messagingBuffer.getByte();
+         assertEquals(b, middleBytes[i-26]);
+      }
+      messagingBuffer = buffer.getPacketFragments().get(2).getMessagingBuffer();
+      for(int i = 2048 + 13; i < 3072; i++)
+      {
+         byte b = messagingBuffer.getByte();
+         assertEquals(b, middleBytes[i-39]);
+      }
+      messagingBuffer = buffer.getPacketFragments().get(3).getMessagingBuffer();
+      for(int i = 3072 + 13; i < 4096; i++)
+      {
+         byte b = messagingBuffer.getByte();
+         assertEquals(b, middleBytes[i-52]);
+      }
+      messagingBuffer = buffer.getPacketFragments().get(4).getMessagingBuffer();
+      for(int i = 4096 + 13; i <= 5000; i++)
+      {
+         byte b = messagingBuffer.getByte();
+         assertEquals(b, middleBytes[i-65]);
+      }
+   }
+
+   protected void tearDown() throws Exception
+   {
+      messagingBuffer = null;
+      buffer = null;
+   }
+}

Modified: branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/SessionTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/SessionTestBase.java	2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/SessionTestBase.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -87,7 +87,7 @@
       
       session.write(packet);
 
-      assertTrue(serverPacketHandler.await(2, SECONDS));
+      assertTrue(serverPacketHandler.await(20, SECONDS));
 
       List<Packet> messages = serverPacketHandler.getPackets();
       assertEquals(1, messages.size());

Modified: branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/mina/MinaConnectorTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/mina/MinaConnectorTest.java	2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/mina/MinaConnectorTest.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -67,6 +67,7 @@
 import org.jboss.messaging.core.remoting.RemotingSession;
 import org.jboss.messaging.core.remoting.TransportType;
 import org.jboss.messaging.core.remoting.impl.ResponseHandlerImpl;
+import org.jboss.messaging.core.remoting.impl.InMemoryMessageCache;
 import org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport;
 import org.jboss.messaging.core.remoting.impl.mina.IoBufferWrapper;
 import org.jboss.messaging.core.remoting.impl.mina.MinaConnector;
@@ -645,7 +646,7 @@
       
       delegateBuilder = new DefaultIoFilterChainBuilder();
 
-      filterChainSupport.addCodecFilter(delegateBuilder);
+      filterChainSupport.addCodecFilter(delegateBuilder, 2048, 1024, new InMemoryMessageCache());
       
       sessionConfig = EasyMock.createMock(SocketSessionConfig.class);
       

Modified: branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/mina/MinaProtocolCodecFilterTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/mina/MinaProtocolCodecFilterTest.java	2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/mina/MinaProtocolCodecFilterTest.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -271,7 +271,7 @@
       buffer.rewind();
       
       SimpleProtocolDecoderOutput out = new SimpleProtocolDecoderOutput();
-      MinaProtocolCodecFilter codec = new MinaProtocolCodecFilter();
+      MinaProtocolCodecFilter codec = new MinaProtocolCodecFilter(2048, -1, null);
       codec.doDecode(null, IoBuffer.wrap(buffer.array()), out);
       Object message = out.getMessage();
       assertTrue(message instanceof Packet);
@@ -304,7 +304,7 @@
       out.write(isA(IoBuffer.class));
       replay(session, out);
 
-      MinaProtocolCodecFilter filter = new MinaProtocolCodecFilter();
+      MinaProtocolCodecFilter filter = new MinaProtocolCodecFilter(2048, -1, null);
       filter.encode(session, message, out);
       
       verify(session, out);

Modified: branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossObjectMessageTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossObjectMessageTest.java	2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossObjectMessageTest.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -122,7 +122,7 @@
       JBossObjectMessage msg = new JBossObjectMessage();
       msg.setObject(object);
 
-      msg.doBeforeSend();
+      //msg.doBeforeSend();
 
       MessagingBuffer body = msg.getCoreMessage().getBody();
       assertEquals(data.length, body.getInt());

Modified: branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossTextMessageTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossTextMessageTest.java	2008-07-23 00:17:58 UTC (rev 4713)
+++ branches/Branch_Message_Chunking/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossTextMessageTest.java	2008-07-23 11:27:02 UTC (rev 4715)
@@ -112,7 +112,7 @@
       JBossTextMessage msg = new JBossTextMessage();
       msg.setText(text);
       
-      msg.doBeforeSend();
+      //msg.doBeforeSend();
 
       MessagingBuffer body = msg.getCoreMessage().getBody();
       String s = body.getNullableString();




More information about the jboss-cvs-commits mailing list