[jboss-cvs] JBoss Messaging SVN: r4744 - in branches: Branch_Message_Chunking_new and 9 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Jul 29 09:42:18 EDT 2008


Author: ataylor
Date: 2008-07-29 09:42:17 -0400 (Tue, 29 Jul 2008)
New Revision: 4744

Added:
   branches/Branch_Message_Chunking_new/
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/MessageCache.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/PacketAssembler.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/PacketFragment.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/PacketFragmentBuffer.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/InMemoryMessageCache.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/PacketAssemblerImpl.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/PacketFragmentBufferImpl.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/PacketFragmentImpl.java
Modified:
   branches/Branch_Message_Chunking_new/build-messaging.xml
   branches/Branch_Message_Chunking_new/messaging.ipr
   branches/Branch_Message_Chunking_new/src/config/jbm-configuration.xml
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/ConnectionParams.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/impl/ConnectionParamsImpl.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/config/Configuration.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/MessagingBuffer.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/RemotingHandler.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/ByteBufferWrapper.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/ConnectionRegistryImpl.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/ExpandingMessagingBuffer.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingHandlerImpl.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaProtocolCodecFilter.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
Log:
new message chunking branch

Copied: branches/Branch_Message_Chunking_new (from rev 4739, trunk)

Modified: branches/Branch_Message_Chunking_new/build-messaging.xml
===================================================================
--- trunk/build-messaging.xml	2008-07-28 17:17:50 UTC (rev 4739)
+++ branches/Branch_Message_Chunking_new/build-messaging.xml	2008-07-29 13:42:17 UTC (rev 4744)
@@ -901,6 +901,7 @@
 
    <target name="runServer" depends="jar">
       <java classname="org.jboss.messaging.microcontainer.JBMBootstrapServer" fork="true">
+         <jvmarg line="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005"/>
          <jvmarg value="-XX:+UseParallelGC"/>
          <jvmarg value="-Xms512M"/>
          <jvmarg value="-Xmx2048M"/>

Modified: branches/Branch_Message_Chunking_new/messaging.ipr
===================================================================
--- trunk/messaging.ipr	2008-07-28 17:17:50 UTC (rev 4739)
+++ branches/Branch_Message_Chunking_new/messaging.ipr	2008-07-29 13:42:17 UTC (rev 4744)
@@ -47,6 +47,9 @@
   <component name="CompilerConfiguration">
     <option name="DEFAULT_COMPILER" value="Javac" />
     <option name="DEPLOY_AFTER_MAKE" value="0" />
+    <excludeFromCompile>
+      <file url="file://$PROJECT_DIR$/src/main/org/jboss/messaging/core/remoting/impl/PacketAssemblerImpl.java" />
+    </excludeFromCompile>
     <resourceExtensions>
       <entry name=".+\.(properties|xml|html|dtd|tld)" />
       <entry name=".+\.(gif|png|jpeg|jpg)" />

Modified: branches/Branch_Message_Chunking_new/src/config/jbm-configuration.xml
===================================================================
--- trunk/src/config/jbm-configuration.xml	2008-07-28 17:17:50 UTC (rev 4739)
+++ branches/Branch_Message_Chunking_new/src/config/jbm-configuration.xml	2008-07-29 13:42:17 UTC (rev 4744)
@@ -51,6 +51,13 @@
       <!--How long to wait for a returning pong after sending a ping message to a client/server.-->
       <!-- If no pong is received after this time resources are cleaned up-->
       <remoting-ping-timeout>5000</remoting-ping-timeout>
+
+      <!--The initial size of the buffer created for the transport-->
+      <remoting-initial-packet-fragment-size>1024</remoting-initial-packet-fragment-size>
+
+      <!--The maximum size of packet to send on the transport, except for the first packet which is the r
+      emoting-initial-packet-fragment-size parameter. setting to 0 means that packet chunking is disabled-->
+      <remoting-packet-fragment-size>1024</remoting-packet-fragment-size>
      
       <!--  if ssl is enabled, all remoting-ssl-* properties must be set -->
       <remoting-enable-ssl>false</remoting-enable-ssl>

Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/ConnectionParams.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ConnectionParams.java	2008-07-28 17:17:50 UTC (rev 4739)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/ConnectionParams.java	2008-07-29 13:42:17 UTC (rev 4744)
@@ -75,5 +75,13 @@
 
    String getTrustStorePassword();
 
-   void setTrustStorePassword(String trustStorePassword);
+   void setTrustStorePassword(String trustStorePassword);       
+
+   int getPacketFragmentSize();
+
+   int getInitialPacketFragmentSize();
+
+   void setInitialPacketFragmentSize(int initialPacketFragmentSize);
+
+   void setPacketFragmentSize(int packetFragmentSize);
 }

Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/impl/ConnectionParamsImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionParamsImpl.java	2008-07-28 17:17:50 UTC (rev 4739)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/impl/ConnectionParamsImpl.java	2008-07-29 13:42:17 UTC (rev 4744)
@@ -45,6 +45,10 @@
    public static final int DEFAULT_TCP_SEND_BUFFER_SIZE = 32 * 1024; // in bytes
    
    public static final boolean DEFAULT_SSL_ENABLED = false;
+
+   public static final Integer DEFAULT_INITIAL_PACKET_FRAGMENT_SIZE = 2048;
+
+   public static final Integer DEFAULT_PACKET_FRAGMENT_SIZE = 1024;
    
    public static final String SSL_KEYSTORE_PATH_PROPERTY_NAME = "jbm.remoting.ssl.keystore.path";
    
@@ -83,6 +87,10 @@
    private String trustStorePath;
    
    private String trustStorePassword;
+
+   private int initialPacketFragmentSize = DEFAULT_INITIAL_PACKET_FRAGMENT_SIZE;
+
+   private int packetFragmentSize = DEFAULT_PACKET_FRAGMENT_SIZE;
    
    public long getCallTimeout()
    {
@@ -202,6 +210,26 @@
    {
       this.trustStorePassword = trustStorePassword;
    }
+
+   public void setInitialPacketFragmentSize(int initialPacketFragmentSize)
+   {
+      this.initialPacketFragmentSize = initialPacketFragmentSize;
+   }
+
+   public int getInitialPacketFragmentSize()
+   {
+      return initialPacketFragmentSize;
+   }
+
+   public void setPacketFragmentSize(int packetFragmentSize)
+   {
+      this.packetFragmentSize = packetFragmentSize;
+   }
+
+   public int getPacketFragmentSize()
+   {
+      return packetFragmentSize;
+   }
         
    public boolean equals(Object other)
    {

Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/Configuration.java	2008-07-28 17:17:50 UTC (rev 4739)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/config/Configuration.java	2008-07-29 13:42:17 UTC (rev 4744)
@@ -18,7 +18,7 @@
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
+ */
 
 package org.jboss.messaging.core.config;
 
@@ -32,117 +32,121 @@
 import org.jboss.messaging.core.server.JournalType;
 
 /**
- * 
+ *
  * A Configuration
- * 
+ *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  *
  */
 public interface Configuration extends Serializable
 {
    // General attributes -------------------------------------------------------------------
-   
+
    boolean isClustered();
-   
+
    void setClustered(boolean clustered);
 
    int getScheduledThreadPoolMaxSize();
-   
+
    void setScheduledThreadPoolMaxSize(int maxSize);
-   
+
    long getSecurityInvalidationInterval();
-   
+
    void setSecurityInvalidationInterval(long interval);
-   
+
    boolean isSecurityEnabled();
-   
+
    void setSecurityEnabled(boolean enabled);
-   
+
    boolean isRequireDestinations();
-   
+
    void setRequireDestinations(boolean require);
 
    // Remoting related attributes ----------------------------------------------------------
-   
+
    List<String> getInterceptorClassNames();
-   
+
    Set<String> getAcceptorFactoryClassNames();
-   
+
    ConnectionParams getConnectionParams();
-   
+
    TransportType getTransport();
-   
+
    void setTransport(TransportType transport);
 
    String getHost();
-   
+
    void setHost(String host);
 
    int getPort();
-   
+
    void setPort(int port);
 
    Location getLocation();
-         
+
    String getKeyStorePath();
-   
+
    void setKeyStorePath(String path);
 
    String getKeyStorePassword();
-   
+
    void setKeyStorePassword(String password);
 
    String getTrustStorePath();
-   
+
    void setTrustStorePath(String path);
 
    String getTrustStorePassword();
-   
+
    void setTrustStorePassword(String password);
-   
+
    boolean isSSLEnabled();
-   
+
    void setSSLEnabled(boolean enabled);
-   
+
    // Journal related attributes
-   
+
    String getBindingsDirectory();
-   
+
    void setBindingsDirectory(String dir);
 
    String getJournalDirectory();
-   
+
    void setJournalDirectory(String dir);
 
    JournalType getJournalType();
-   
+
    void setJournalType(JournalType type);
 
    boolean isJournalSyncTransactional();
-   
+
    void setJournalSyncTransactional(boolean sync);
-   
+
    boolean isJournalSyncNonTransactional();
-   
+
    void setJournalSyncNonTransactional(boolean sync);
 
    int getJournalFileSize();
-   
+
    void setJournalFileSize(int size);
 
    int getJournalMinFiles();
-   
+
    void setJournalMinFiles(int files);
-   
+
    int getJournalMaxAIO();
-   
+
    void setJournalMaxAIO(int maxAIO);
-   
+
    boolean isCreateBindingsDir();
-   
+
    void setCreateBindingsDir(boolean create);
 
    boolean isCreateJournalDir();
-   
+
    void setCreateJournalDir(boolean create);
+
+   int getInitialPacketFragmentSize();
+
+   int getPacketFragmentSize();
 }

Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2008-07-28 17:17:50 UTC (rev 4739)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2008-07-29 13:42:17 UTC (rev 4744)
@@ -91,8 +91,12 @@
    
    public static final int DEFAULT_MAX_AIO = 5000;
    
-   private static final long serialVersionUID = 4077088945050267843L;
+   private static final long serialVersionUID = 4077088945050267843L;   
 
+   public static final int INITIAL_PACKET_FRAGMENT_SIZE = 2048;
+
+   public static final int PACKET_FRAGMENT_SIZE = 1024;
+
    
    // Attributes -----------------------------------------------------------------------------
       
@@ -136,6 +140,10 @@
    protected String host = DEFAULT_HOST;
    
    protected int port = DEFAULT_PORT;
+
+   protected int initialPacketFragmentSize = INITIAL_PACKET_FRAGMENT_SIZE;
+
+   protected int packetFragmentSize = PACKET_FRAGMENT_SIZE;
    
    protected final ConnectionParams defaultConnectionParams = new ConnectionParamsImpl();
    
@@ -406,6 +414,16 @@
       this.securityEnabled = enabled;
    }
 
+   public int getInitialPacketFragmentSize()
+   {
+      return initialPacketFragmentSize;
+   }
+
+   public int getPacketFragmentSize()
+   {
+      return packetFragmentSize;
+   }
+
    public ConnectionParams getConnectionParams()
    {
       return this.defaultConnectionParams;

Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java	2008-07-28 17:17:50 UTC (rev 4739)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java	2008-07-29 13:42:17 UTC (rev 4744)
@@ -116,6 +116,10 @@
 
       trustStorePassword = getString(e, "remoting-ssl-truststore-password", null);
 
+      initialPacketFragmentSize = getInteger(e, "remoting-initial-packet-fragment-size", ConnectionParamsImpl.DEFAULT_INITIAL_PACKET_FRAGMENT_SIZE);
+
+      packetFragmentSize = getInteger(e, "remoting-packet-fragment-size", ConnectionParamsImpl.DEFAULT_PACKET_FRAGMENT_SIZE);
+
       defaultConnectionParams.setCallTimeout(callTimeout);
       
       defaultConnectionParams.setInVMOptimisationEnabled(inVMOptimisationEnabled);
@@ -129,6 +133,11 @@
       defaultConnectionParams.setPingInterval(pingInterval);
       
       defaultConnectionParams.setSSLEnabled(sslEnabled);
+
+      defaultConnectionParams.setInitialPacketFragmentSize(initialPacketFragmentSize);
+
+      defaultConnectionParams.setPacketFragmentSize(packetFragmentSize);
+
       
       NodeList interceptorNodes = e.getElementsByTagName("remoting-interceptors");
 

Added: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/MessageCache.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/MessageCache.java	                        (rev 0)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/MessageCache.java	2008-07-29 13:42:17 UTC (rev 4744)
@@ -0,0 +1,13 @@
+package org.jboss.messaging.core.remoting;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public interface MessageCache
+{
+   void cache(MessagingBuffer buffer, int length, long sessionId, int packetId, int correlationId);
+
+   MessagingBuffer retrieve(int length, long sessionId, int packetId, int correlationId);
+
+   void clear(long id);
+}

Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/MessagingBuffer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/MessagingBuffer.java	2008-07-28 17:17:50 UTC (rev 4739)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/MessagingBuffer.java	2008-07-29 13:42:17 UTC (rev 4744)
@@ -52,9 +52,11 @@
 	void putFloat(float val);
 	
 	void putBoolean(boolean val);
+
+   void putBoolean(int pos, boolean b);
+
+   void putChar(char val);
 	
-	void putChar(char val);
-	
 	void putNullableString(String val);
 	
 	void putString(String val);

Added: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/PacketAssembler.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/PacketAssembler.java	                        (rev 0)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/PacketAssembler.java	2008-07-29 13:42:17 UTC (rev 4744)
@@ -0,0 +1,17 @@
+package org.jboss.messaging.core.remoting;
+
+import org.jboss.messaging.core.remoting.spi.Connection;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public interface PacketAssembler
+{
+   Packet assemble(MessagingBuffer buffer, RemotingHandler handler, long id) throws Exception;
+
+   void disAssemble(MessagingBuffer buffer, Connection connection, Packet message);
+
+   int getInitialPacketFragmentSize();
+
+   int getPacketFragmentSize();
+}

Added: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/PacketFragment.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/PacketFragment.java	                        (rev 0)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/PacketFragment.java	2008-07-29 13:42:17 UTC (rev 4744)
@@ -0,0 +1,86 @@
+package org.jboss.messaging.core.remoting;
+
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public interface PacketFragment
+{
+   int getPacketId();
+
+   int getCorrelationId();
+
+   int getLength();
+
+   boolean isLastPacket();
+
+   int getEndPosition();
+
+   boolean checkSpace(int size);
+
+   boolean hasReadSpace(int size);
+
+   void putFloat(float val);
+
+   void putInt(int i);
+
+   void putByte(byte b);
+
+   void putLong(long l);
+
+   void putBoolean(boolean b);
+
+   void putBytes(byte[] bytes, int i, int i1);
+
+   void putDouble(double val);
+
+   void putShort(short val);
+
+   void putChar(char val);
+
+   int getInt();
+
+   long getLong();
+
+   short getShort();
+
+   boolean getBoolean();
+
+   byte getByte();
+
+   void getBytes(byte[] data);
+
+   void getBytes(byte[] data, int read, int left);
+
+   float getFloat();
+
+   double getDouble();
+
+   char getChar();
+
+   short getUnsignedByte();
+
+   int getBodyLength();
+
+   void setBodyLength(int bodyLength);
+
+   void writeHeaders();
+
+   int remaining();
+
+   int limit();
+
+   PacketFragment createNextPacketFragment(int packetFragmentSize);
+
+   void putBoolean(int actualPos, boolean val);
+
+   void putInt(int actualPos, int val);
+
+   void rewind();
+
+   void flip();
+
+   void position(int position);
+
+   MessagingBuffer getMessagingBuffer();
+}

Added: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/PacketFragmentBuffer.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/PacketFragmentBuffer.java	                        (rev 0)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/PacketFragmentBuffer.java	2008-07-29 13:42:17 UTC (rev 4744)
@@ -0,0 +1,34 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting;
+
+import java.util.List;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public interface PacketFragmentBuffer extends MessagingBuffer
+{
+   List<PacketFragment> getPacketFragments();
+
+   void prepare();
+}

Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/RemotingHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/RemotingHandler.java	2008-07-28 17:17:50 UTC (rev 4739)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/RemotingHandler.java	2008-07-29 13:42:17 UTC (rev 4744)
@@ -40,5 +40,7 @@
    
    Set<Long> scanForFailedConnections(long expirePeriod);
    
-   void removeLastPing(long connectionID);   
+   void removeLastPing(long connectionID);
+
+   Packet decode(final long connectionID, final MessagingBuffer in) throws Exception;
 }

Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/ByteBufferWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/ByteBufferWrapper.java	2008-07-28 17:17:50 UTC (rev 4739)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/ByteBufferWrapper.java	2008-07-29 13:42:17 UTC (rev 4744)
@@ -307,7 +307,19 @@
       }
 	}
 
-	public void putByte(byte val)
+   public void putBoolean(int pos, boolean b)
+   {
+      if (b)
+      {
+         buffer.put(pos, TRUE);
+      }
+      else
+      {
+         buffer.put(pos, FALSE);
+      }
+   }
+
+   public void putByte(byte val)
 	{
 		buffer.put(val);
 	}

Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/ConnectionRegistryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectionRegistryImpl.java	2008-07-28 17:17:50 UTC (rev 4739)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/ConnectionRegistryImpl.java	2008-07-29 13:42:17 UTC (rev 4744)
@@ -30,12 +30,7 @@
 import org.jboss.messaging.core.client.Location;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.ConnectionLifeCycleListener;
-import org.jboss.messaging.core.remoting.ConnectionRegistry;
-import org.jboss.messaging.core.remoting.PacketDispatcher;
-import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.RemotingHandler;
-import org.jboss.messaging.core.remoting.TransportType;
+import org.jboss.messaging.core.remoting.*;
 import org.jboss.messaging.core.remoting.spi.Connector;
 import org.jboss.messaging.core.remoting.spi.ConnectorFactory;
 import org.jboss.messaging.core.remoting.spi.Connection;
@@ -88,17 +83,21 @@
          return connection;
       }
       else
-      {                        
+      {
+         MessageCache messageCache = new InMemoryMessageCache();
+
+         PacketAssembler assembler = new PacketAssemblerImpl(connectionParams.getInitialPacketFragmentSize(), connectionParams.getPacketFragmentSize(), messageCache);
+
          PacketDispatcher dispatcher = new PacketDispatcherImpl(null);
          
-         RemotingHandler handler = new RemotingHandlerImpl(dispatcher, null);
+         RemotingHandler handler = new RemotingHandlerImpl(dispatcher, null, assembler);
          
          Connector connector = createConnector(location, connectionParams, handler, this);
          
          connector.start();
          
          Connection tc = connector.createConnection();
-         
+
          if (tc == null)
          {
             throw new IllegalStateException("Failed to connect to " + location);
@@ -109,12 +108,12 @@
          if (pingInterval != -1)
          {
             connection = new RemotingConnectionImpl(tc, dispatcher, location,
-               connectionParams.getCallTimeout(), connectionParams.getPingInterval(), pingExecutor);
+               connectionParams.getCallTimeout(), connectionParams.getPingInterval(), pingExecutor, assembler);
          }
          else
          {
             connection = new RemotingConnectionImpl(tc, dispatcher, location,
-                  connectionParams.getCallTimeout());
+                  connectionParams.getCallTimeout(), assembler);
          }
 
          remotingConnections.put(tc.getID(), connection);

Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/ExpandingMessagingBuffer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/ExpandingMessagingBuffer.java	2008-07-28 17:17:50 UTC (rev 4739)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/ExpandingMessagingBuffer.java	2008-07-29 13:42:17 UTC (rev 4744)
@@ -213,6 +213,18 @@
       putByte((byte) (val ? -1 : 0));
    }
 
+   public void putBoolean(int pos, boolean b)
+   {
+      if (b)
+      {
+         buf.put(pos, TRUE);
+      }
+      else
+      {
+         buf.put(pos, FALSE);
+      }
+   }
+
    public void putByte(byte val)
    {
       ensureRemaining(1).put(val);

Added: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/InMemoryMessageCache.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/InMemoryMessageCache.java	                        (rev 0)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/InMemoryMessageCache.java	2008-07-29 13:42:17 UTC (rev 4744)
@@ -0,0 +1,82 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting.impl;
+
+import org.jboss.messaging.core.remoting.MessageCache;
+import org.jboss.messaging.core.remoting.MessagingBuffer;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.List;
+import java.util.HashMap;
+import java.util.ArrayList;
+
+/**
+ * A simple in memory cache for message buffers.
+ *
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class InMemoryMessageCache implements MessageCache
+{
+   Map<Long, Map<Integer, List<MessagingBuffer>>> cache = new HashMap<Long, Map<Integer, List<MessagingBuffer>>>();
+   
+   public void cache(MessagingBuffer buffer, int length, long sessionId, int packetId, int correlationID)
+   {
+      getByteList(sessionId, packetId).add(buffer);
+   }
+
+   public MessagingBuffer retrieve(int length, long sessionId, int packetId, int correlationId)
+   {
+      List<MessagingBuffer> buffers = getByteList(sessionId, packetId);
+      MessagingBuffer buffer = buffers.remove(0);
+      if(buffers.isEmpty())
+      {
+         Map<Integer, List<MessagingBuffer>> sessionMap = cache.get(sessionId);
+         sessionMap.remove(packetId);
+         if(sessionMap.isEmpty())
+         {
+            cache.remove(sessionId);
+         }
+      }
+      return buffer;
+   }
+
+   public void clear(long id)
+   {
+      cache.remove(id);
+   }
+
+
+   protected List<MessagingBuffer> getByteList(long sessionId, int packetId)
+   {
+      if(cache.get(sessionId) == null)
+      {
+         cache.put(sessionId, new HashMap<Integer, List<MessagingBuffer>>());
+      }
+      if(cache.get(sessionId).get(packetId) == null)
+      {
+         cache.get(sessionId).put(packetId, new ArrayList<MessagingBuffer>());
+      }
+      return cache.get(sessionId).get(packetId);
+   }
+
+}

Added: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/PacketAssemblerImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/PacketAssemblerImpl.java	                        (rev 0)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/PacketAssemblerImpl.java	2008-07-29 13:42:17 UTC (rev 4744)
@@ -0,0 +1,160 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting.impl;
+
+import org.jboss.messaging.core.remoting.*;
+import org.jboss.messaging.core.remoting.spi.Connection;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.util.DataConstants;
+
+import java.util.Map;
+import java.util.List;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class PacketAssemblerImpl implements PacketAssembler
+{
+   private static final Logger log = Logger.getLogger(PacketAssemblerImpl.class);
+   Map<Long, Map<Integer, List<PacketFragment>>> allFragments = new HashMap<Long, Map<Integer, List<PacketFragment>>>();
+   private AtomicInteger packetId = new AtomicInteger(0);
+   private int packetFragmentSize;
+   private boolean useMessageChunking = false;
+   private MessageCache messageCache;
+   private int initialPacketFragmentSize;
+
+
+   public PacketAssemblerImpl(final int initialPacketFragmentSize,final int packetFragmentSize,final MessageCache messageCache)
+   {
+      this.packetFragmentSize = packetFragmentSize;
+      this.initialPacketFragmentSize = initialPacketFragmentSize;
+      useMessageChunking = this.packetFragmentSize > 0;
+      this.messageCache = messageCache;
+   }
+
+   public Packet assemble(final MessagingBuffer buffer,final RemotingHandler handler,final long id) throws Exception
+   {
+      int length = buffer.getInt();
+      if (useMessageChunking)
+      {
+         return assembleMulti(buffer, handler, id, length);
+      }
+      else
+      {
+         return assembleSingle(buffer, handler, id, length);
+      }
+   }
+
+
+   public void disAssemble(final MessagingBuffer buffer, final Connection connection, final Packet message)
+   {
+      if (useMessageChunking)
+      {
+         disasembleMulti(buffer, connection, message);
+      }
+      else
+      {
+         disasembleSingle(buffer, connection, message);
+      }
+   }
+
+   public int getInitialPacketFragmentSize()
+   {
+      return initialPacketFragmentSize;
+   }
+
+   public int getPacketFragmentSize()
+   {
+      return packetFragmentSize;
+   }
+
+   private List<PacketFragment> getFragments(final long id, final int packetId)
+   {
+      if(allFragments.get(id) == null)
+      {
+         allFragments.put(id, new HashMap<Integer, List<PacketFragment>>());
+      }
+      if(allFragments.get(id).get(packetId) == null)
+      {
+         allFragments.get(id).put(packetId, new ArrayList<PacketFragment>());
+      }
+      return allFragments.get(id).get(packetId);
+   }
+
+
+   private void disasembleMulti(final MessagingBuffer buffer, final Connection connection, final Packet message)
+   {
+      PacketFragmentBuffer buff = new PacketFragmentBufferImpl(packetId.getAndIncrement(), buffer, packetFragmentSize);
+      message.encode(buff);
+      buff.prepare();
+      buff.flip();
+      for (PacketFragment packetFragment : buff.getPacketFragments())
+      {
+         connection.write(packetFragment.getMessagingBuffer());
+      }
+   }
+
+   private void disasembleSingle(MessagingBuffer buffer, Connection connection, Packet message)
+   {
+      buffer.putInt(-1);
+      message.encode(buffer);
+      //The length doesn't include the actual length byte
+      int len = buffer.position() - DataConstants.SIZE_INT;
+      buffer.putInt(0, len);
+      buffer.flip();
+      connection.write(buffer);
+   }
+
+   private Packet assembleMulti(final MessagingBuffer buffer, final RemotingHandler handler, final long id, final int length)
+         throws Exception
+   {
+      boolean lastPacket = buffer.getBoolean();
+      int packetId = buffer.getInt();
+      int correlationId = buffer.getInt();
+
+      if(lastPacket)
+      {
+         PacketFragmentImpl fragment = new PacketFragmentImpl(packetId, lastPacket, correlationId, length, buffer);
+         getFragments(id, fragment.getPacketId()).add(fragment);
+         List<PacketFragment> fragments = allFragments.get(id).remove(fragment.getPacketId());
+         PacketFragmentBuffer buff = new PacketFragmentBufferImpl(fragments, packetFragmentSize);
+         return handler.decode(id, buff);
+      }
+      else
+      {
+         //if we are part of a multi part packet then we need to be cached for later use
+         PacketFragmentImpl fragment = new PacketFragmentImpl(packetId, lastPacket, correlationId, length, messageCache,id);
+         getFragments(id, fragment.getPacketId()).add(fragment);
+         messageCache.cache(buffer, length - 9, id, packetId, correlationId);
+         return null;
+      }
+   }
+
+   private Packet assembleSingle(final MessagingBuffer buffer, final RemotingHandler handler, final long id, final int length)
+         throws Exception
+   {
+      return handler.decode(id, buffer);
+   }
+}

Added: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/PacketFragmentBufferImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/PacketFragmentBufferImpl.java	                        (rev 0)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/PacketFragmentBufferImpl.java	2008-07-29 13:42:17 UTC (rev 4744)
@@ -0,0 +1,533 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting.impl;
+
+import org.jboss.messaging.core.remoting.PacketFragment;
+import org.jboss.messaging.core.remoting.MessagingBuffer;
+import org.jboss.messaging.core.remoting.PacketFragmentBuffer;
+import static org.jboss.messaging.util.DataConstants.*;
+import org.jboss.messaging.util.SimpleString;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A MessagingBuffer that can be used to break up the buffers into smaller pieces.
+ *
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class PacketFragmentBufferImpl implements PacketFragmentBuffer
+{
+
+   private static final Charset utf8 = Charset.forName("UTF-8");
+
+   private List<PacketFragment> packetFragments;
+   private int currentPos = 0;
+   private int size = 0;
+   private int correlationId = 0;
+   private int packetFragmentSize;
+   private int position = 0;
+
+   public PacketFragmentBufferImpl(int packetId, MessagingBuffer buffer, int packetFragmentSize)
+   {
+      packetFragments = new ArrayList<PacketFragment>();
+      packetFragments.add(new PacketFragmentImpl(packetId, correlationId++, buffer));
+      this.packetFragmentSize = packetFragmentSize;
+   }
+
+   public PacketFragmentBufferImpl(List<PacketFragment> packetFragments, int packetFragmentSize)
+   {
+      if(packetFragments == null || packetFragments.size() == 0)
+      {
+         throw new IllegalArgumentException("packet fragments must not be null or empty");
+      }
+      this.packetFragments = packetFragments;
+      currentPos = 0;
+      this.packetFragmentSize = packetFragmentSize;
+   }
+
+   public void putFloat(float val)
+   {
+      checkWriteSpace(SIZE_FLOAT);
+      getCurrentFragment().putFloat(val);
+      position += SIZE_FLOAT;
+   }
+
+   public void putInt(int i)
+   {
+      checkWriteSpace(SIZE_INT);
+      getCurrentFragment().putInt(i);
+      position += SIZE_INT;
+   }
+
+   public void putByte(byte b)
+   {
+      checkWriteSpace(SIZE_BYTE);
+      getCurrentFragment().putByte(b);
+      position += SIZE_BYTE;
+   }
+
+   public void putLong(long l)
+   {
+      checkWriteSpace(SIZE_LONG);
+      getCurrentFragment().putLong(l);
+      position += SIZE_LONG;
+   }
+
+   public void putBoolean(boolean b)
+   {
+      checkWriteSpace(SIZE_BOOLEAN);
+      getCurrentFragment().putBoolean(b);
+      position += SIZE_BOOLEAN;
+   }
+
+   public void putNullableString(String nullableString)
+   {
+      if (nullableString == null)
+		{
+			putByte(NULL);
+		}
+		else
+		{
+			putByte(NOT_NULL);
+
+			putString(nullableString);
+		}
+   }
+
+   public void putSimpleString(SimpleString string)
+   {
+      byte[] data = string.getData();
+
+   	putInt(data.length);
+   	putBytes(data);
+   }
+
+   public void putNullableSimpleString(SimpleString string)
+   {
+      if (string == null)
+   	{
+   		putByte(NULL);
+   	}
+   	else
+   	{
+   	   putByte(NOT_NULL);
+   		putSimpleString(string);
+   	}
+   }
+
+   public void putBytes(byte[] bytes, int i, int i1)
+   {
+      if(getCurrentFragment().checkSpace(i1 - i))
+      {
+         getCurrentFragment().putBytes(bytes, i, i1);
+      }
+      else
+      {
+         int size = bytes.length;
+         int written = 0;
+         while(written < i1)
+         {
+            int left = getCurrentFragment().remaining();
+            int towrite = left + written > i1?i1 - written:left;
+            getCurrentFragment().putBytes(bytes, i, towrite);
+            written+=towrite;
+            i += left;
+            if(written < i1)
+            {
+               setNextFragment();
+            }
+         }
+      }
+      position += i1;
+   }
+
+   public void putBytes(byte[] bytes)
+   {
+      putBytes(bytes, 0, bytes.length);
+   }
+
+   public void putShort(short val)
+   {
+      checkWriteSpace(SIZE_SHORT);
+      getCurrentFragment().putShort(val);
+      position += SIZE_SHORT;
+   }
+
+   public void putDouble(double val)
+   {
+      checkWriteSpace(SIZE_DOUBLE);
+      getCurrentFragment().putDouble(val);
+      position += SIZE_DOUBLE;
+   }
+
+   public void putChar(char val)
+   {
+      checkWriteSpace(SIZE_CHAR);
+      getCurrentFragment().putChar(val);
+      position += SIZE_CHAR;
+   }
+
+   public void putUTF(String str)
+   {
+      //TODO This is quite inefficient - can be improved using a method similar to what MINA IOBuffer does
+		//(putPrefixedString)
+		ByteBuffer bb = utf8.encode(str);
+   	putInt(bb.limit() - bb.position());
+   	putBytes(bb.array());
+   }
+
+   public void putBoolean(int pos, boolean val)
+   {
+      int currentpos = 0;
+      for (PacketFragment packetFragment : packetFragments)
+      {
+         if(pos < packetFragment.getBodyLength())
+         {
+            packetFragment.putBoolean(pos - currentpos, val);
+            return;
+         }
+         currentpos += packetFragment.getBodyLength();
+      }
+      throw new IndexOutOfBoundsException();
+   }
+
+   public void putInt(int pos, int val)
+   {
+      int currentpos = 0;
+      for (PacketFragment packetFragment : packetFragments)
+      {
+         if(pos < packetFragment.getBodyLength())
+         {
+            packetFragment.putInt(pos - currentpos, val);
+            return;
+         }
+         currentpos += packetFragment.getBodyLength();
+      }
+      throw new IndexOutOfBoundsException();
+   }
+
+
+   public void putString(String string)
+   {
+      putInt(string.length());
+
+		for (int i = 0; i < string.length(); i++)
+		{
+			putChar(string.charAt(i));
+		}
+   }
+
+   public byte[] array()
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public int capacity()
+   {
+      return 0;
+   }
+
+   public void limit(int limit)
+   {
+
+   }
+
+   public void position(int position)
+   {
+      int currentpos = 0;
+      for (int i = 0; i < packetFragments.size(); i++)
+      {
+         PacketFragment packetFragment = packetFragments.get(i);
+         currentpos += packetFragment.getBodyLength();
+         if (position < packetFragment.getBodyLength())
+         {
+            currentpos = i;
+            this.position = position;
+            packetFragment.position(position - currentpos);
+            return;
+         }
+      }
+   }
+
+   public int position()
+   {
+      return position;
+   }
+
+   public MessagingBuffer slice()
+   {
+      return null;
+   }
+
+   public MessagingBuffer createNewBuffer(int len)
+   {
+      return getCurrentFragment().getMessagingBuffer().createNewBuffer(len);
+   }
+
+
+   public String getString()
+   {
+      int len = getInt();
+
+   	char[] chars = new char[len];
+
+      for (int i = 0; i < len; i++)
+      {
+      	chars[i] = getChar();
+      }
+
+      return new String(chars);
+   }
+
+   public void getBytes(byte[] value, int i, int read)
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public String getUTF() throws Exception
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public void flip()
+   {
+      for (PacketFragment packetFragment : packetFragments)
+      {
+         packetFragment.flip();
+      }
+      currentPos = 0;
+      position = 0;
+   }
+
+   public void rewind()
+   {
+      for (PacketFragment packetFragment : packetFragments)
+      {
+         packetFragment.rewind();
+      }
+      currentPos = 0;
+      position = 0;
+   }
+
+
+   public int getInt()
+   {
+      checkReadSpace(SIZE_INT);
+      position += SIZE_INT;
+      return getCurrentFragment().getInt();
+   }
+
+   public long getLong()
+   {
+      checkReadSpace(SIZE_LONG);
+      position += SIZE_LONG;
+      return getCurrentFragment().getLong();
+   }
+
+   public short getShort()
+   {
+      checkReadSpace(SIZE_SHORT);
+      position += SIZE_SHORT;
+      return getCurrentFragment().getShort();
+   }
+
+   public boolean getBoolean()
+   {
+      checkReadSpace(SIZE_BOOLEAN);
+      position += SIZE_BOOLEAN;
+      return getCurrentFragment().getBoolean();
+   }
+
+   public String getNullableString()
+   {
+      byte check = getByte();
+
+		if (check == NULL)
+		{
+			return null;
+		}
+		else
+		{
+			return getString();
+		}
+   }
+
+   public SimpleString getSimpleString()
+   {
+      int len = getInt();
+
+   	byte[] data = new byte[len];
+   	getBytes(data);
+
+   	return new SimpleString(data);
+   }
+
+    public SimpleString getNullableSimpleString()
+   {
+      int b = getByte();
+   	if (b == NULL)
+   	{
+   	   return null;
+   	}
+   	else
+   	{
+         return getSimpleString();
+   	}
+   }
+
+   public byte getByte()
+   {
+      checkReadSpace(SIZE_BYTE);
+      position += SIZE_BYTE;
+      return getCurrentFragment().getByte();
+   }
+
+   public void getBytes(byte[] data)
+   {
+      int left = getCurrentFragment().remaining();
+      if(data.length <= left)
+      {
+         getCurrentFragment().getBytes(data);
+      }
+      else
+      {
+         int size = data.length;
+         int read = 0;
+         while (read < size)
+         {
+            left = getCurrentFragment().remaining() <= (size - read)? getCurrentFragment().remaining():size - read;
+
+            getCurrentFragment().getBytes(data, read, left);
+            read += left;
+            if(read < size)
+            {
+               currentPos++;
+            }
+
+         }
+
+      }
+      position += data.length;
+   }
+
+
+   public float getFloat()
+   {
+      checkReadSpace(SIZE_FLOAT);
+      position += SIZE_FLOAT;
+      return getCurrentFragment().getFloat();
+   }
+
+   public double getDouble()
+   {
+      checkReadSpace(SIZE_DOUBLE);
+      position += SIZE_DOUBLE;
+      return getCurrentFragment().getDouble();
+   }
+
+   public char getChar()
+   {
+      checkReadSpace(SIZE_CHAR);
+      position += SIZE_CHAR;
+      return getCurrentFragment().getChar();
+   }
+
+   public List<PacketFragment> getPacketFragments()
+   {
+      return packetFragments;
+   }
+
+   public void prepare()
+   {
+      for (PacketFragment packetFragment : packetFragments)
+      {
+         packetFragment.writeHeaders();
+      }
+   }
+
+   public short getUnsignedByte()
+   {
+      checkReadSpace(SIZE_SHORT);
+      return getCurrentFragment().getUnsignedByte();
+   }
+
+   public int getUnsignedShort()
+   {
+     checkReadSpace(SIZE_INT);
+      return getCurrentFragment().getUnsignedByte();
+   }
+
+   public int remaining()
+   {
+      return getCurrentFragment().remaining();
+   }
+
+   public int limit()
+   {
+      int currentLimit = 0;
+      for (PacketFragment packetFragment : packetFragments)
+      {
+         currentLimit += packetFragment.limit();
+      }
+      return currentLimit;
+   }
+
+    public Object getUnderlyingBuffer()
+    {
+        return null;
+    }
+
+    // Private -------------------------------------------------------
+
+   private void checkReadSpace(int size)
+   {
+      if(!getCurrentFragment().checkSpace(size))
+      {
+         currentPos++;
+      }
+   }
+
+   private void checkWriteSpace(int size)
+   {
+      if(!getCurrentFragment().checkSpace(size))
+      {
+         setNextFragment();
+      }
+   }
+
+   private void setNextFragment()
+   {
+      packetFragments.add(getCurrentFragment().createNextPacketFragment(packetFragmentSize));
+      currentPos++;
+   }
+
+
+   private PacketFragment getCurrentFragment()
+   {
+      return packetFragments.get(currentPos);
+   }
+
+
+}

Added: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/PacketFragmentImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/PacketFragmentImpl.java	                        (rev 0)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/PacketFragmentImpl.java	2008-07-29 13:42:17 UTC (rev 4744)
@@ -0,0 +1,295 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting.impl;
+
+import org.jboss.messaging.core.remoting.MessageCache;
+import org.jboss.messaging.core.remoting.PacketFragment;
+import org.jboss.messaging.core.remoting.MessagingBuffer;
+import static org.jboss.messaging.util.DataConstants.*;
+
+/**
+ * A fragment of a full packet that holds a portion of a full message within a buffer.
+ *
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class PacketFragmentImpl implements PacketFragment
+{
+   private int bodyLength = 0;
+   private int packetId;
+   private int correlationId;
+   private boolean lastPacket;
+   private long sessionId;
+   private int dataStartPosition = -1;
+   private MessagingBuffer messagingBuffer;
+   private MessageCache messageCache;
+   private static final int HEADER_SIZE = (SIZE_INT * 3) + SIZE_BOOLEAN;
+
+   public PacketFragmentImpl(int packetId, boolean lastPacket, int correlationId, int length, MessageCache messageCache, long id)
+   {
+      this.packetId = packetId;
+      this.lastPacket = lastPacket;
+      this.correlationId = correlationId;
+      this.messageCache = messageCache;
+      this.bodyLength = length - HEADER_SIZE;
+      this.sessionId = id;
+      dataStartPosition = 0;
+   }
+
+    public PacketFragmentImpl(int packetId, boolean lastPacket, int correlationId, int length, MessagingBuffer messagingBuffer)
+   {
+      this.packetId = packetId;
+      this.lastPacket = lastPacket;
+      this.correlationId = correlationId;
+      this.bodyLength = length - HEADER_SIZE;
+      this.messagingBuffer = messagingBuffer;
+      dataStartPosition = messagingBuffer.position();
+   }
+
+   public PacketFragmentImpl(int packetId, int correlationId, MessagingBuffer messagingBuffer)
+   {
+      this.packetId = packetId;
+      this.lastPacket = true;
+      this.correlationId = correlationId;
+      this.messagingBuffer = messagingBuffer;
+      messagingBuffer.putInt(-1);
+      messagingBuffer.putBoolean(true);
+      messagingBuffer.putInt(packetId);
+      messagingBuffer.putInt(correlationId);
+      dataStartPosition = messagingBuffer.position();
+   }
+
+   public int getPacketId()
+   {
+      return packetId;
+   }
+
+   public int getCorrelationId()
+   {
+      return correlationId;
+   }
+
+   public int getLength()
+   {
+      return bodyLength + HEADER_SIZE;
+   }
+
+   public boolean isLastPacket()
+   {
+      return lastPacket;
+   }
+
+   public MessagingBuffer getMessagingBuffer()
+   {
+      if(messagingBuffer == null)
+      {
+         messagingBuffer = messageCache.retrieve(getLength(), sessionId, packetId, correlationId);
+      }
+      return messagingBuffer;
+   }
+
+   public int getEndPosition()
+   {
+      return dataStartPosition + getLength() - HEADER_SIZE;
+   }
+
+   public boolean checkSpace(int size)
+   {
+      int left = getMessagingBuffer().limit() - getMessagingBuffer().position();
+      return size<=left;
+   }
+
+   public boolean hasReadSpace(int size)
+   {
+      return getMessagingBuffer().position() + size <= getMessagingBuffer().limit();
+   }
+
+   public void putFloat(float val)
+   {
+      getMessagingBuffer().putFloat(val);
+      bodyLength += SIZE_FLOAT;
+   }
+
+   public void putInt(int i)
+   {
+      getMessagingBuffer().putInt(i);
+      bodyLength += SIZE_INT;
+   }
+
+   public void putByte(byte b)
+   {
+      getMessagingBuffer().putByte(b);
+      bodyLength += SIZE_BYTE;
+   }
+
+   public void putLong(long l)
+   {
+      getMessagingBuffer().putLong(l);
+      bodyLength += SIZE_LONG;
+   }
+
+   public void putBytes(byte[] bytes, int i, int i1)
+   {
+      getMessagingBuffer().putBytes(bytes, i, i1);
+      bodyLength += i1;
+   }
+
+   public void putDouble(double val)
+   {
+      getMessagingBuffer().putDouble(val);
+      bodyLength += SIZE_DOUBLE;
+   }
+
+   public void putShort(short val)
+   {
+      getMessagingBuffer().putShort(val);
+      bodyLength += SIZE_SHORT;
+   }
+
+   public void putChar(char val)
+   {
+      getMessagingBuffer().putChar(val);
+      bodyLength += SIZE_CHAR;
+   }
+
+   public void putBoolean(boolean b)
+   {
+      getMessagingBuffer().putBoolean(b);
+      bodyLength += SIZE_BOOLEAN;
+   }
+
+   public int getInt()
+   {
+      return getMessagingBuffer().getInt();
+   }
+
+   public long getLong()
+   {
+      return getMessagingBuffer().getLong();
+   }
+
+   public short getShort()
+   {
+      return getMessagingBuffer().getShort();
+   }
+
+   public boolean getBoolean()
+   {
+      return getMessagingBuffer().getBoolean();
+   }
+
+   public byte getByte()
+   {
+      return getMessagingBuffer().getByte();
+   }
+
+   public void getBytes(byte[] data)
+   {
+      getMessagingBuffer().getBytes(data);
+   }
+
+   public void getBytes(byte[] data, int read, int left)
+   {
+      getMessagingBuffer().getBytes(data, read, left);
+   }
+
+   public float getFloat()
+   {
+      return getMessagingBuffer().getFloat();
+   }
+
+   public double getDouble()
+   {
+      return getMessagingBuffer().getDouble();
+   }
+
+   public char getChar()
+   {
+      return getMessagingBuffer().getChar();
+   }
+
+   public short getUnsignedByte()
+   {
+      return getMessagingBuffer().getUnsignedByte();
+   }
+
+   public int getBodyLength()
+   {
+      return bodyLength;
+   }
+
+   public void setBodyLength(int bodyLength)
+   {
+      this.bodyLength = bodyLength;
+   }
+
+   public int remaining()
+   {
+      return getMessagingBuffer().remaining();
+   }
+
+   public int limit()
+   {
+      return getMessagingBuffer().limit() - HEADER_SIZE;
+   }
+
+   public void writeHeaders()
+   {
+      getMessagingBuffer().putInt(0, getLength() - SIZE_INT);
+      //we initially wrote true anyway
+      if(!isLastPacket())
+      {
+         getMessagingBuffer().putBoolean(SIZE_INT, isLastPacket());
+      }
+   }
+
+   public PacketFragment createNextPacketFragment(int packetFragmentSize)
+   {
+      lastPacket = false;
+      return new PacketFragmentImpl(packetId, correlationId + 1,
+            getMessagingBuffer().createNewBuffer(packetFragmentSize));
+   }
+
+   public void putBoolean(int pos, boolean val)
+   {
+      getMessagingBuffer().putBoolean(pos + HEADER_SIZE, val);
+   }
+
+   public void putInt(int pos, int val)
+   {
+      getMessagingBuffer().putInt(pos + HEADER_SIZE,  val);
+   }
+
+   public void rewind()
+   {
+      getMessagingBuffer().rewind();
+   }
+
+   public void flip()
+   {
+      getMessagingBuffer().flip();
+   }
+
+   public void position(int position)
+   {
+      getMessagingBuffer().position(position);
+   }
+}

Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-07-28 17:17:50 UTC (rev 4739)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-07-29 13:42:17 UTC (rev 4744)
@@ -31,12 +31,7 @@
 import org.jboss.messaging.core.client.Location;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.FailureListener;
-import org.jboss.messaging.core.remoting.MessagingBuffer;
-import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.PacketDispatcher;
-import org.jboss.messaging.core.remoting.PacketHandler;
-import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.*;
 import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
 import org.jboss.messaging.core.remoting.spi.Connection;
@@ -82,13 +77,16 @@
 
    private volatile boolean destroyed;
 
+   private PacketAssembler packetAssembler;
+
    // Constructors
    // ---------------------------------------------------------------------------------
 
    public RemotingConnectionImpl(final Connection transportConnection,
          final PacketDispatcher dispatcher, final Location location,
          final long blockingCallTimeout, final long pingPeriod,
-         final ScheduledExecutorService pingExecutor)
+         final ScheduledExecutorService pingExecutor,
+         final PacketAssembler packetAssembler)
    {
       this.transportConnection = transportConnection;
 
@@ -98,6 +96,8 @@
 
       this.blockingCallTimeout = blockingCallTimeout;
 
+      this.packetAssembler = packetAssembler;
+
       pinger = new Pinger();
 
       pongHandler = new PongHandler(dispatcher.generateID());
@@ -106,11 +106,12 @@
 
       future = pingExecutor.scheduleWithFixedDelay(pinger, pingPeriod, pingPeriod,
                                                    TimeUnit.MILLISECONDS);
+
    }
 
    public RemotingConnectionImpl(final Connection transportConnection,
          final PacketDispatcher dispatcher, final Location location,
-         final long blockingCallTimeout)
+         final long blockingCallTimeout, PacketAssembler packetAssembler)
    {
       this.transportConnection = transportConnection;
 
@@ -119,6 +120,8 @@
       this.location = location;
 
       this.blockingCallTimeout = blockingCallTimeout;
+
+      this.packetAssembler = packetAssembler;
    }
 
    // Public
@@ -300,11 +303,9 @@
          throw new IllegalStateException("Cannot write packet to connection, it is destroyed");
       }
       
-      MessagingBuffer buffer = transportConnection.createBuffer(PacketImpl.INITIAL_BUFFER_SIZE);
+      MessagingBuffer buffer = transportConnection.createBuffer(packetAssembler.getInitialPacketFragmentSize());
 
-      packet.encode(buffer);
-
-      transportConnection.write(buffer);
+      packetAssembler.disAssemble(buffer, transportConnection, packet);
    }
   
 

Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingHandlerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingHandlerImpl.java	2008-07-28 17:17:50 UTC (rev 4739)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingHandlerImpl.java	2008-07-29 13:42:17 UTC (rev 4744)
@@ -39,10 +39,7 @@
 import java.util.concurrent.ExecutorService;
 
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.MessagingBuffer;
-import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.PacketDispatcher;
-import org.jboss.messaging.core.remoting.RemotingHandler;
+import org.jboss.messaging.core.remoting.*;
 import org.jboss.messaging.core.remoting.impl.wireformat.ConnectionCreateSessionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.ConnectionCreateSessionResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerFlowCreditMessage;
@@ -105,7 +102,9 @@
    
    private final ConcurrentMap<Long, Long> lastPings = new ConcurrentHashMap<Long, Long>();
 
-   public RemotingHandlerImpl(final PacketDispatcher dispatcher, final ExecutorService executorService)                              
+   private PacketAssembler packetAssembler;
+
+   public RemotingHandlerImpl(final PacketDispatcher dispatcher, final ExecutorService executorService, final PacketAssembler packetAssembler)
    {
       if (dispatcher == null)
       {
@@ -122,6 +121,7 @@
       {
          executorFactory = null;
       }
+      this.packetAssembler = packetAssembler;
    }
    
    public Set<Long> scanForFailedConnections(final long expirePeriod)
@@ -145,7 +145,12 @@
    
    public void bufferReceived(final long connectionID, final MessagingBuffer buffer) throws Exception
    {
-      final Packet packet = decode(connectionID, buffer);
+      final Packet packet = packetAssembler.assemble(buffer, this, connectionID);
+
+      if(packet == null)
+      {
+         return;
+      }
                
       if (executorFactory != null)
       {
@@ -199,7 +204,7 @@
       {
          return -1;
       }
-      
+
       return length;
    }
    

Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java	2008-07-28 17:17:50 UTC (rev 4739)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java	2008-07-29 13:42:17 UTC (rev 4744)
@@ -36,12 +36,7 @@
 import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.ConnectionLifeCycleListener;
-import org.jboss.messaging.core.remoting.Interceptor;
-import org.jboss.messaging.core.remoting.PacketDispatcher;
-import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.RemotingHandler;
-import org.jboss.messaging.core.remoting.RemotingService;
+import org.jboss.messaging.core.remoting.*;
 import org.jboss.messaging.core.remoting.spi.Acceptor;
 import org.jboss.messaging.core.remoting.spi.AcceptorFactory;
 import org.jboss.messaging.core.remoting.spi.Connection;
@@ -83,7 +78,10 @@
    private final Timer failedConnectionTimer = new Timer(true);
    
    private TimerTask failedConnectionsTask;
-      
+
+   private PacketAssembler packetAssembler;
+   private MessageCache messageCache;
+
    // Static --------------------------------------------------------
    
    // Constructors --------------------------------------------------
@@ -97,9 +95,12 @@
       dispatcher = new PacketDispatcherImpl(null);
 
       remotingExecutor = Executors.newCachedThreadPool(new JBMThreadFactory("JBM-session-ordering-threads"));
+
+      messageCache = new InMemoryMessageCache();
+
+      packetAssembler = new PacketAssemblerImpl(config.getInitialPacketFragmentSize(), config.getPacketFragmentSize(), messageCache);
+      handler = new RemotingHandlerImpl(dispatcher, remotingExecutor, packetAssembler);
       
-      handler = new RemotingHandlerImpl(dispatcher, remotingExecutor);
-      
       long pingPeriod = config.getConnectionParams().getPingInterval();
       
       if (pingPeriod != -1)
@@ -232,7 +233,7 @@
    public void connectionCreated(final Connection connection)
    {
       RemotingConnection rc =
-         new RemotingConnectionImpl(connection, dispatcher, null, config.getConnectionParams().getCallTimeout());
+         new RemotingConnectionImpl(connection, dispatcher, null, config.getConnectionParams().getCallTimeout(), packetAssembler);
       
       this.connections.put(connection.getID(), rc);
    }
@@ -240,7 +241,9 @@
    public void connectionDestroyed(long connectionID)
    {
       handler.removeLastPing(connectionID);
-      
+
+      messageCache.clear(connectionID);
+
       if (connections.remove(connectionID) == null)
       {
          throw new IllegalStateException("Cannot find connection with id " + connectionID);
@@ -250,6 +253,8 @@
    public void connectionException(long connectionID, MessagingException me)
    {
       RemotingConnection rc = connections.remove(connectionID);
+
+      messageCache.clear(connectionID);
       
       if (rc == null)
       {

Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java	2008-07-28 17:17:50 UTC (rev 4739)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java	2008-07-29 13:42:17 UTC (rev 4744)
@@ -242,6 +242,18 @@
       }
    }
 
+   public void putBoolean(int pos, boolean b)
+   {
+      if (b)
+      {
+         buffer.put(pos, TRUE);
+      }
+      else
+      {
+         buffer.put(pos, FALSE);
+      }
+   }
+
    public boolean getBoolean()
    {
       byte b = buffer.get();

Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaProtocolCodecFilter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaProtocolCodecFilter.java	2008-07-28 17:17:50 UTC (rev 4739)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaProtocolCodecFilter.java	2008-07-29 13:42:17 UTC (rev 4744)
@@ -84,15 +84,15 @@
    public boolean doDecode(final IoSession session, final IoBuffer in, final ProtocolDecoderOutput out) throws Exception
    {
       //TODO - we can avoid this entirely if we maintain fragmented packets in the handler
-      
+
       int start = in.position();
       
       int length = handler.isReadyToHandle(new IoBufferWrapper(in));
-      
+
+      in.position(start);
+
       if (length == -1)
-      {         
-         in.position(start);
-         
+      {
          return false;
       }
 

Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2008-07-28 17:17:50 UTC (rev 4739)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2008-07-29 13:42:17 UTC (rev 4744)
@@ -189,7 +189,6 @@
    public void encode(MessagingBuffer buffer)
    {      
       //The standard header fields
-      buffer.putInt(0); //The length gets filled in at the end
       buffer.putByte(type); 
       buffer.putInt(commandID);
       buffer.putLong(responseTargetID);
@@ -197,13 +196,6 @@
       buffer.putLong(executorID);
 
       encodeBody(buffer);
-      
-      //The length doesn't include the actual length byte
-      int len = buffer.position() - DataConstants.SIZE_INT;
-      
-      buffer.putInt(0, len);
-      
-      buffer.flip();
    }
 
    public void decode(final MessagingBuffer buffer)




More information about the jboss-cvs-commits mailing list