[hornetq-commits] JBoss hornetq SVN: r9575 - in branches/Branch_Large_Message_Compression: src/main/org/hornetq/api/core and 19 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Aug 20 18:30:34 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-08-20 18:30:33 -0400 (Fri, 20 Aug 2010)
New Revision: 9575

Added:
   branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java
   branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java
   branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/HornetQBufferInputStream.java
   branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/CompressedLargeMessageTest.java
   branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/unit/util/HornetQBufferInputStreamTest.java
Modified:
   branches/Branch_Large_Message_Compression/src/config/common/schema/hornetq-jms.xsd
   branches/Branch_Large_Message_Compression/src/main/org/hornetq/api/core/Message.java
   branches/Branch_Large_Message_Compression/src/main/org/hornetq/api/core/client/ClientSessionFactory.java
   branches/Branch_Large_Message_Compression/src/main/org/hornetq/api/core/client/HornetQClient.java
   branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
   branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java
   branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
   branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java
   branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
   branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
   branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
   branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
   branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/DelegatingSession.java
   branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/FailoverManager.java
   branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
   branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/LargeMessageBufferInternal.java
   branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
   branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/JMSServerManager.java
   branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java
   branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
   branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java
   branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
   branches/Branch_Large_Message_Compression/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml
   branches/Branch_Large_Message_Compression/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java
   branches/Branch_Large_Message_Compression/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java
   branches/Branch_Large_Message_Compression/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java
   branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
   branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java
   branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java
   branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/ReSendMessageTest.java
   branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java
   branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java
   branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java
   branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/server/config/JMSServerConfigParserTest.java
   branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java
   branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/util/JMSTestBase.java
Log:
Initial implementation

Modified: branches/Branch_Large_Message_Compression/src/config/common/schema/hornetq-jms.xsd
===================================================================
--- branches/Branch_Large_Message_Compression/src/config/common/schema/hornetq-jms.xsd	2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/config/common/schema/hornetq-jms.xsd	2010-08-20 22:30:33 UTC (rev 9575)
@@ -75,7 +75,11 @@
             </xsd:element>
    			<xsd:element name="min-large-message-size" type="xsd:long"
    				maxOccurs="1" minOccurs="0">
-   			</xsd:element>   			          
+   			</xsd:element>   			 
+   			<xsd:element name="compress-large-messages" type="xsd:boolean"
+                maxOccurs="1" minOccurs="0">
+            </xsd:element>
+   			         
             <xsd:element name="client-id" type="xsd:string"
                 maxOccurs="1" minOccurs="0">
             </xsd:element>            

Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/api/core/Message.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/api/core/Message.java	2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/api/core/Message.java	2010-08-20 22:30:33 UTC (rev 9575)
@@ -65,6 +65,8 @@
    public static final SimpleString HDR_ORIG_MESSAGE_ID = new SimpleString("_HQ_ORIG_MESSAGE_ID");
 
    public static final SimpleString HDR_GROUP_ID = new SimpleString("_HQ_GROUP_ID");
+   
+   public static final SimpleString HDR_LARGE_COMPRESSED = new SimpleString("_HQ_LARGE_COMPRESSED");
 
    public static final SimpleString HDR_SCHEDULED_DELIVERY_TIME = new SimpleString("_HQ_SCHED_DELIVERY");
 

Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/api/core/client/ClientSessionFactory.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/api/core/client/ClientSessionFactory.java	2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/api/core/client/ClientSessionFactory.java	2010-08-20 22:30:33 UTC (rev 9575)
@@ -252,6 +252,27 @@
     * @param minLargeMessageSize large message size threshold in bytes
     */
    void setMinLargeMessageSize(int minLargeMessageSize);
+   
+   
+   /** 
+    * If this attribute is set to true, the message body will be compressed when sent as large message.
+    * 
+    * the compression will be done using the GZIP protocol.
+    * 
+    * 
+    * @param compressLargeMessage
+    */
+   void setCompressLargeMessages(boolean compressLargeMessage);
+   
+   /** 
+    * If this attribute is set to true, the message body will be compressed when sent as large message.
+    * 
+    * the compression will be done using the GZIP protocol.
+    * 
+    * 
+    * @param compressLargeMessage
+    */
+   boolean isCompressLargeMessages();
 
    /**
     * Returns the window size for flow control of the consumers created through this factory.

Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/api/core/client/HornetQClient.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/api/core/client/HornetQClient.java	2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/api/core/client/HornetQClient.java	2010-08-20 22:30:33 UTC (rev 9575)
@@ -41,6 +41,8 @@
    // Any message beyond this size is considered a large message (to be sent in chunks)
    
    public static final int DEFAULT_MIN_LARGE_MESSAGE_SIZE = 100 * 1024;
+   
+   public static final boolean DEFAULT_COMPRESS_LARGE_MESSAGES = false;
 
    public static final int DEFAULT_CONSUMER_WINDOW_SIZE = 1024 * 1024;
 

Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2010-08-20 22:30:33 UTC (rev 9575)
@@ -30,6 +30,7 @@
 import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
+import org.hornetq.utils.DecompressedLargeMessageBuffer;
 import org.hornetq.utils.Future;
 import org.hornetq.utils.PriorityLinkedList;
 import org.hornetq.utils.PriorityLinkedListImpl;
@@ -444,6 +445,11 @@
    // ClientConsumerInternal implementation
    // --------------------------------------------------------------
 
+   public ClientSessionInternal getSession()
+   {
+      return session;
+   }
+   
    public SessionQueueQueryResponseMessage getQueueInfo()
    {
       return queueInfo;
@@ -544,7 +550,14 @@
 
       currentLargeMessageBuffer = new LargeMessageBufferImpl(this, packet.getLargeMessageSize(), 60, largeMessageCache);
 
-      currentChunkMessage.setBuffer(currentLargeMessageBuffer);
+      if (currentChunkMessage.isCompressed())
+      {
+         currentChunkMessage.setBuffer(new DecompressedLargeMessageBuffer(currentLargeMessageBuffer, session.getThreadPool()));
+      }
+      else
+      {
+         currentChunkMessage.setBuffer(currentLargeMessageBuffer);
+      }
 
       currentChunkMessage.setFlowControlSize(0);
 

Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java	2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java	2010-08-20 22:30:33 UTC (rev 9575)
@@ -67,4 +67,6 @@
    void start();
    
    SessionQueueQueryResponseMessage getQueueInfo();
+   
+   ClientSessionInternal getSession();
 }

Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java	2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java	2010-08-20 22:30:33 UTC (rev 9575)
@@ -21,11 +21,13 @@
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQBuffers;
 import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Message;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.message.BodyEncoder;
 import org.hornetq.core.message.impl.MessageImpl;
-import org.hornetq.utils.DataConstants;
+import org.hornetq.utils.GZipUtil;
+import org.hornetq.utils.HornetQBufferInputStream;
 
 /**
  * 
@@ -117,6 +119,11 @@
    {
       return largeMessage;
    }
+   
+   public boolean isCompressed()
+   {
+      return properties.getBooleanProperty(Message.HDR_LARGE_COMPRESSED);
+   }
 
    /**
     * @param largeMessage the largeMessage to set
@@ -142,7 +149,6 @@
              "]";
    }
 
-   // FIXME - only used for large messages - move it!
    /* (non-Javadoc)
     * @see org.hornetq.api.core.client.ClientMessage#saveToOutputStream(java.io.OutputStream)
     */
@@ -150,7 +156,7 @@
    {
       if (largeMessage)
       {
-         ((LargeMessageBufferInternal)getWholeBuffer()).saveBuffer(out);
+          ((LargeMessageBufferInternal)getWholeBuffer()).saveBuffer(out);
       }
       else
       {

Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java	2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java	2010-08-20 22:30:33 UTC (rev 9575)
@@ -44,4 +44,6 @@
    void discardLargeBody();
 
    void setBuffer(HornetQBuffer buffer);
+   
+   boolean isCompressed();
 }

Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java	2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java	2010-08-20 22:30:33 UTC (rev 9575)
@@ -13,8 +13,16 @@
 
 package org.hornetq.core.client.impl;
 
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.concurrent.Executor;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
 
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQBuffers;
@@ -28,6 +36,8 @@
 import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
+import org.hornetq.utils.GZipUtil;
+import org.hornetq.utils.HornetQBufferInputStream;
 import org.hornetq.utils.TokenBucketLimiter;
 import org.hornetq.utils.UUIDGenerator;
 
@@ -150,7 +160,7 @@
       {
          return;
       }
-            
+
       doCleanup();
    }
 
@@ -190,7 +200,7 @@
    {
       return credits;
    }
-   
+
    // Protected ------------------------------------------------------------------------------------
 
    // Package Private ------------------------------------------------------------------------------
@@ -203,7 +213,7 @@
       {
          session.returnCredits(address);
       }
-      
+
       session.removeProducer(this);
 
       closed = true;
@@ -212,12 +222,13 @@
    private void doSend(final SimpleString address, final Message msg) throws HornetQException
    {
       MessageInternal msgI = (MessageInternal)msg;
-      
+
       ClientProducerCredits theCredits;
-      
+
       boolean isLarge;
 
-      if (msgI.getBodyInputStream() != null || msgI.isLargeMessage() || msgI.getBodyBuffer().writerIndex() > minLargeMessageSize)
+      if (msgI.getBodyInputStream() != null || msgI.isLargeMessage() ||
+          msgI.getBodyBuffer().writerIndex() > minLargeMessageSize)
       {
          isLarge = true;
       }
@@ -236,7 +247,7 @@
          {
             msg.setAddress(address);
          }
-         
+
          // Anonymous
          theCredits = session.getCredits(address, true);
       }
@@ -250,7 +261,7 @@
          {
             msg.setAddress(this.address);
          }
-         
+
          theCredits = credits;
       }
 
@@ -270,8 +281,6 @@
 
       session.workDone();
 
-      
-
       if (isLarge)
       {
          largeMessageSend(sendBlocking, msgI, theCredits);
@@ -322,8 +331,16 @@
     * @param msgI
     * @throws HornetQException
     */
-   private void largeMessageSend(final boolean sendBlocking, final MessageInternal msgI, final ClientProducerCredits credits) throws HornetQException
+   private void largeMessageSend(final boolean sendBlocking,
+                                 final MessageInternal msgI,
+                                 final ClientProducerCredits credits) throws HornetQException
    {
+
+      if (session.isCompressLargeMessages())
+      {
+         msgI.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, true);
+      }
+
       int headerSize = msgI.getHeadersAndPropertiesEncodeSize();
 
       if (headerSize >= minLargeMessageSize)
@@ -341,7 +358,6 @@
       HornetQBuffer headerBuffer = HornetQBuffers.fixedBuffer(headerSize);
 
       msgI.encodeHeadersAndProperties(headerBuffer);
-
       SessionSendLargeMessage initialChunk = new SessionSendLargeMessage(headerBuffer.toByteBuffer().array());
 
       channel.send(initialChunk);
@@ -358,7 +374,7 @@
 
       if (input != null)
       {
-         largeMessageSendStreamed(sendBlocking, input, credits);
+         largeMessageSendStreamed(sendBlocking, msgI, input, credits);
       }
       else
       {
@@ -375,72 +391,29 @@
                                          final MessageInternal msgI,
                                          final ClientProducerCredits credits) throws HornetQException
    {
-      BodyEncoder context = msgI.getBodyEncoder();
-
-      final long bodySize = context.getLargeBodySize();
-
-      context.open();
-      try
-      {
-
-         for (int pos = 0; pos < bodySize;)
-         {
-            final boolean lastChunk;
-
-            final int chunkLength = Math.min((int)(bodySize - pos), minLargeMessageSize);
-
-            final HornetQBuffer bodyBuffer = HornetQBuffers.fixedBuffer(chunkLength);
-
-            context.encode(bodyBuffer, chunkLength);
-
-            pos += chunkLength;
-
-            lastChunk = pos >= bodySize;
-
-            final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(bodyBuffer.toByteBuffer()
-                                                                                                      .array(),
-                                                                                            !lastChunk,
-                                                                                            lastChunk && sendBlocking);
-
-            if (sendBlocking && lastChunk)
-            {
-               // When sending it blocking, only the last chunk will be blocking.
-               channel.sendBlocking(chunk);
-            }
-            else
-            {
-               channel.send(chunk);
-            }
-
-            try
-            {
-               credits.acquireCredits(chunk.getPacketSize());
-            }
-            catch (InterruptedException e)
-            {
-            }
-         }
-      }
-      finally
-      {
-         context.close();
-      }
+      msgI.getBodyBuffer().readerIndex(0);
+      largeMessageSendStreamed(sendBlocking, msgI, new HornetQBufferInputStream(msgI.getBodyBuffer()), credits);
    }
 
    /**
-    * TODO: This method could be eliminated and 
-    *       combined with {@link ClientProducerImpl#largeMessageSendBuffered(boolean, Message, ClientProducerCredits)}. 
-    *       All that's needed for this is ClientMessage returning the proper BodyEncoder for streamed
     * @param sendBlocking
     * @param input
     * @throws HornetQException
     */
    private void largeMessageSendStreamed(final boolean sendBlocking,
-                                         final InputStream input,
+                                         final MessageInternal msgI,
+                                         final InputStream inputStreamParameter,
                                          final ClientProducerCredits credits) throws HornetQException
    {
       boolean lastPacket = false;
 
+      InputStream input = inputStreamParameter;
+
+      if (session.isCompressLargeMessages())
+      {
+         input = GZipUtil.pipeGZip(inputStreamParameter, true, session.getThreadPool());
+      }
+
       while (!lastPacket)
       {
          byte[] buff = new byte[minLargeMessageSize];

Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2010-08-20 22:30:33 UTC (rev 9575)
@@ -105,6 +105,8 @@
    private long callTimeout;
 
    private int minLargeMessageSize;
+   
+   private boolean compressLargeMessages;
 
    private int consumerWindowSize;
 
@@ -308,6 +310,8 @@
 
       minLargeMessageSize = other.getMinLargeMessageSize();
 
+      compressLargeMessages = other.isCompressLargeMessages();
+
       consumerWindowSize = other.getConsumerWindowSize();
 
       consumerMaxRate = other.getConsumerMaxRate();
@@ -370,6 +374,8 @@
       callTimeout = HornetQClient.DEFAULT_CALL_TIMEOUT;
 
       minLargeMessageSize = HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+      
+      compressLargeMessages = HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;      
 
       consumerWindowSize = HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE;
 
@@ -465,16 +471,6 @@
 
    // ClientSessionFactory implementation------------------------------------------------------------
 
-   public synchronized boolean isCacheLargeMessagesClient()
-   {
-      return cacheLargeMessagesClient;
-   }
-
-   public synchronized void setCacheLargeMessagesClient(final boolean cached)
-   {
-      cacheLargeMessagesClient = cached;
-   }
-
    public synchronized List<Pair<TransportConfiguration, TransportConfiguration>> getStaticConnectors()
    {
       return staticConnectors;
@@ -531,10 +527,31 @@
       this.minLargeMessageSize = minLargeMessageSize;
    }
 
+   public synchronized boolean isCacheLargeMessagesClient()
+   {
+      checkWrite();
+      return cacheLargeMessagesClient;
+   }
+
+   public synchronized void setCacheLargeMessagesClient(final boolean cached)
+   {
+      cacheLargeMessagesClient = cached;
+   }
+
    public synchronized int getConsumerWindowSize()
    {
       return consumerWindowSize;
    }
+   
+   public synchronized void setCompressLargeMessages(final boolean compress)
+   {
+      this.compressLargeMessages = compress;
+   }
+   
+   public synchronized boolean isCompressLargeMessages()
+   {
+      return compressLargeMessages;
+   }
 
    public synchronized void setConsumerWindowSize(final int consumerWindowSize)
    {
@@ -1129,6 +1146,7 @@
                                                                ackBatchSize,
                                                                cacheLargeMessagesClient,
                                                                minLargeMessageSize,
+                                                               compressLargeMessages,
                                                                blockOnAcknowledge,
                                                                autoGroup,
                                                                confirmationWindowSize,

Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2010-08-20 22:30:33 UTC (rev 9575)
@@ -154,6 +154,8 @@
    private final boolean blockOnDurableSend;
 
    private final int minLargeMessageSize;
+   
+   private final boolean compressLargeMessages;
 
    private volatile int initialMessagePacketSize;
 
@@ -204,6 +206,7 @@
                             final boolean blockOnDurableSend,
                             final boolean cacheLargeMessageClient,
                             final int minLargeMessageSize,
+                            final boolean compressLargeMessages,
                             final int initialMessagePacketSize,
                             final String groupID,
                             final CoreRemotingConnection remotingConnection,
@@ -256,6 +259,8 @@
       this.cacheLargeMessageClient = cacheLargeMessageClient;
 
       this.minLargeMessageSize = minLargeMessageSize;
+      
+      this.compressLargeMessages = compressLargeMessages;
 
       this.initialMessagePacketSize = initialMessagePacketSize;
 
@@ -267,6 +272,15 @@
    // ClientSession implementation
    // -----------------------------------------------------------------
 
+   /**
+    * This will be used for instance when compressin large messages.
+    * the compression has to be done through a PipedOutputStream, and that needs to be done on a different thread
+    */
+   public Executor getThreadPool()
+   {
+      return failoverManager.getThreadPool();
+   }
+   
    public void createQueue(final SimpleString address, final SimpleString queueName) throws HornetQException
    {
       internalCreateQueue(address, queueName, null, false, false);
@@ -664,6 +678,11 @@
    {
       return minLargeMessageSize;
    }
+   
+   public boolean isCompressLargeMessages()
+   {
+      return compressLargeMessages;
+   }
 
    /**
     * @return the cacheLargeMessageClient

Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java	2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java	2010-08-20 22:30:33 UTC (rev 9575)
@@ -13,6 +13,8 @@
 
 package org.hornetq.core.client.impl;
 
+import java.util.concurrent.Executor;
+
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.Message;
 import org.hornetq.api.core.SimpleString;
@@ -39,6 +41,8 @@
    boolean isCacheLargeMessageClient();
 
    int getMinLargeMessageSize();
+   
+   boolean isCompressLargeMessages();
 
    void expire(long consumerID, long messageID) throws HornetQException;
 
@@ -85,4 +89,6 @@
    void setAddress(Message message, SimpleString address);
    
    void setPacketSize(int packetSize);
+   
+   Executor getThreadPool();
 }

Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/DelegatingSession.java	2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/DelegatingSession.java	2010-08-20 22:30:33 UTC (rev 9575)
@@ -14,6 +14,7 @@
 package org.hornetq.core.client.impl;
 
 import java.util.Set;
+import java.util.concurrent.Executor;
 
 import javax.transaction.xa.XAException;
 import javax.transaction.xa.XAResource;
@@ -555,4 +556,20 @@
    {
       session.setPacketSize(packetSize);
    }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.client.impl.ClientSessionInternal#isCompressLargeMessages()
+    */
+   public boolean isCompressLargeMessages()
+   {
+      return session.isCompressLargeMessages();
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.client.impl.ClientSessionInternal#getThreadPool()
+    */
+   public Executor getThreadPool()
+   {
+      return session.getThreadPool();
+   }
 }

Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/FailoverManager.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/FailoverManager.java	2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/FailoverManager.java	2010-08-20 22:30:33 UTC (rev 9575)
@@ -13,6 +13,8 @@
 
 package org.hornetq.core.client.impl;
 
+import java.util.concurrent.Executor;
+
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.client.ClientSession;
 import org.hornetq.api.core.client.SessionFailureListener;
@@ -38,6 +40,7 @@
                                final int ackBatchSize,
                                final boolean cacheLargeMessageClient,
                                final int minLargeMessageSize,
+                               final boolean compressLargeMessages,
                                final boolean blockOnAcknowledge,
                                final boolean autoGroup,
                                final int confirmationWindowSize,
@@ -63,4 +66,6 @@
    boolean removeFailureListener(SessionFailureListener listener);
 
    void causeExit();
+   
+   Executor getThreadPool();
 }

Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java	2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java	2010-08-20 22:30:33 UTC (rev 9575)
@@ -297,8 +297,13 @@
       handleConnectionFailure(connectionID, me);
    }
 
-   // ConnectionManager implementation ------------------------------------------------------------------
+   // FailoverManager implementation ------------------------------------------------------------------
 
+   public Executor getThreadPool()
+   {
+      return threadPool;
+   }
+   
    public ClientSession createSession(final String username,
                                       final String password,
                                       final boolean xa,
@@ -308,6 +313,7 @@
                                       final int ackBatchSize,
                                       final boolean cacheLargeMessageClient,
                                       final int minLargeMessageSize,
+                                      final boolean compressLargeMessages,
                                       final boolean blockOnAcknowledge,
                                       final boolean autoGroup,
                                       final int confWindowSize,
@@ -457,6 +463,7 @@
                                                                      blockOnDurableSend,
                                                                      cacheLargeMessageClient,
                                                                      minLargeMessageSize,
+                                                                     compressLargeMessages,
                                                                      initialMessagePacketSize,
                                                                      groupID,
                                                                      theConnection,

Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/LargeMessageBufferInternal.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/LargeMessageBufferInternal.java	2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/LargeMessageBufferInternal.java	2010-08-20 22:30:33 UTC (rev 9575)
@@ -17,6 +17,7 @@
 
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
 
 /**
  * A LargeMessageBufferInternal
@@ -55,6 +56,8 @@
     * Saves this buffer to the specified output.
     */
    void saveBuffer(final OutputStream output) throws HornetQException;
+   
+   public void addPacket(final SessionReceiveContinuationMessage packet);
 
    /**
     * Waits for the completion for the specified waiting time (in milliseconds).

Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java	2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java	2010-08-20 22:30:33 UTC (rev 9575)
@@ -648,6 +648,13 @@
       }
    }
 
+   /**
+    * @param compressLargeMessages
+    */
+   public void setCompressLargeMessages(boolean compressLargeMessages)
+   {
+   }
+
    // Inner classes --------------------------------------------------------------------------------
 
 }

Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/JMSServerManager.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/JMSServerManager.java	2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/JMSServerManager.java	2010-08-20 22:30:33 UTC (rev 9575)
@@ -198,6 +198,7 @@
                                 long callTimeout,
                                 boolean cacheLargeMessagesClient,
                                 int minLargeMessageSize,
+                                boolean compressLargeMessage,
                                 int consumerWindowSize,
                                 int consumerMaxRate,
                                 int confirmationWindowSize,
@@ -235,6 +236,7 @@
                                 long callTimeout,
                                 boolean cacheLargeMessagesClient,
                                 int minLargeMessageSize,
+                                boolean compressLargeMessages,
                                 int consumerWindowSize,
                                 int consumerMaxRate,
                                 int confirmationWindowSize,

Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java	2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java	2010-08-20 22:30:33 UTC (rev 9575)
@@ -106,6 +106,10 @@
    int getMinLargeMessageSize();
 
    void setMinLargeMessageSize(int minLargeMessageSize);
+   
+   boolean isCompressLargeMessages();
+   
+   void setCompressLargeMessages(boolean compress);   
 
    int getConsumerWindowSize();
 

Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java	2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java	2010-08-20 22:30:33 UTC (rev 9575)
@@ -66,6 +66,8 @@
    private boolean cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
 
    private int minLargeMessageSize = HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+   
+   private boolean compressLargeMessage = HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;
 
    private int consumerWindowSize = HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE;
 
@@ -306,6 +308,16 @@
       this.minLargeMessageSize = minLargeMessageSize;
    }
 
+   public boolean isCompressLargeMessages()
+   {
+      return compressLargeMessage;
+   }
+
+   public void setCompressLargeMessages(final boolean compress)
+   {
+      this.compressLargeMessage = compress;
+   }
+
    public int getConsumerWindowSize()
    {
       return consumerWindowSize;

Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java	2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java	2010-08-20 22:30:33 UTC (rev 9575)
@@ -254,6 +254,11 @@
                                                                 "min-large-message-size",
                                                                 HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
                                                                 Validators.GT_ZERO);
+      
+      boolean compressLargeMessages = XMLConfigurationUtil.getBoolean(e,
+                                                                "compress-large-messages",
+                                                                HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES);
+      
       boolean blockOnAcknowledge = XMLConfigurationUtil.getBoolean(e,
                                                                    "block-on-acknowledge",
                                                                    HornetQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE);
@@ -387,6 +392,7 @@
       cfConfig.setCallTimeout(callTimeout);
       cfConfig.setCacheLargeMessagesClient(cacheLargeMessagesClient);
       cfConfig.setMinLargeMessageSize(minLargeMessageSize);
+      cfConfig.setCompressLargeMessages(compressLargeMessages);
       cfConfig.setConsumerWindowSize(consumerWindowSize);
       cfConfig.setConsumerMaxRate(consumerMaxRate);
       cfConfig.setConfirmationWindowSize(confirmationWindowSize);

Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java	2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java	2010-08-20 22:30:33 UTC (rev 9575)
@@ -711,6 +711,7 @@
                                                     final long callTimeout,
                                                     final boolean cacheLargeMessagesClient,
                                                     final int minLargeMessageSize,
+                                                    final boolean compressLargeMessage,
                                                     final int consumerWindowSize,
                                                     final int consumerMaxRate,
                                                     final int confirmationWindowSize,
@@ -747,6 +748,7 @@
          configuration.setCallTimeout(callTimeout);
          configuration.setCacheLargeMessagesClient(cacheLargeMessagesClient);
          configuration.setMinLargeMessageSize(minLargeMessageSize);
+         configuration.setCompressLargeMessages(compressLargeMessage);
          configuration.setConsumerWindowSize(consumerWindowSize);
          configuration.setConsumerMaxRate(consumerMaxRate);
          configuration.setConfirmationWindowSize(confirmationWindowSize);
@@ -786,6 +788,7 @@
                                                     final long callTimeout,
                                                     final boolean cacheLargeMessagesClient,
                                                     final int minLargeMessageSize,
+                                                    final boolean compressLargeMessages,
                                                     final int consumerWindowSize,
                                                     final int consumerMaxRate,
                                                     final int confirmationWindowSize,
@@ -827,6 +830,7 @@
          configuration.setCallTimeout(callTimeout);
          configuration.setCacheLargeMessagesClient(cacheLargeMessagesClient);
          configuration.setMinLargeMessageSize(minLargeMessageSize);
+         configuration.setCompressLargeMessages(compressLargeMessages);
          configuration.setConsumerWindowSize(consumerWindowSize);
          configuration.setConsumerMaxRate(consumerMaxRate);
          configuration.setConfirmationWindowSize(confirmationWindowSize);
@@ -926,6 +930,7 @@
                                                                     final long callTimeout,
                                                                     final boolean cacheLargeMessagesClient,
                                                                     final int minLargeMessageSize,
+                                                                    final boolean compressLargeMessages,
                                                                     final int consumerWindowSize,
                                                                     final int consumerMaxRate,
                                                                     final int confirmationWindowSize,
@@ -964,6 +969,7 @@
          cf.setCallTimeout(callTimeout);
          cf.setCacheLargeMessagesClient(cacheLargeMessagesClient);
          cf.setMinLargeMessageSize(minLargeMessageSize);
+         cf.setCompressLargeMessages(compressLargeMessages);
          cf.setConsumerWindowSize(consumerWindowSize);
          cf.setConsumerMaxRate(consumerMaxRate);
          cf.setConfirmationWindowSize(confirmationWindowSize);
@@ -999,6 +1005,7 @@
                                                                     final long callTimeout,
                                                                     final boolean cacheLargeMessagesClient,
                                                                     final int minLargeMessageSize,
+                                                                    final boolean compressLargeMessages,
                                                                     final int consumerWindowSize,
                                                                     final int consumerMaxRate,
                                                                     final int confirmationWindowSize,
@@ -1034,6 +1041,7 @@
          cf.setCallTimeout(callTimeout);
          cf.setCacheLargeMessagesClient(cacheLargeMessagesClient);
          cf.setMinLargeMessageSize(minLargeMessageSize);
+         cf.setCompressLargeMessages(compressLargeMessages);
          cf.setConsumerWindowSize(consumerWindowSize);
          cf.setConsumerMaxRate(consumerMaxRate);
          cf.setConfirmationWindowSize(confirmationWindowSize);
@@ -1169,6 +1177,7 @@
                                               cfConfig.getCallTimeout(),
                                               cfConfig.isCacheLargeMessagesClient(),
                                               cfConfig.getMinLargeMessageSize(),
+                                              cfConfig.isCompressLargeMessages(),
                                               cfConfig.getConsumerWindowSize(),
                                               cfConfig.getConsumerMaxRate(),
                                               cfConfig.getConfirmationWindowSize(),
@@ -1203,6 +1212,7 @@
                                               cfConfig.getCallTimeout(),
                                               cfConfig.isCacheLargeMessagesClient(),
                                               cfConfig.getMinLargeMessageSize(),
+                                              cfConfig.isCompressLargeMessages(),
                                               cfConfig.getConsumerWindowSize(),
                                               cfConfig.getConsumerMaxRate(),
                                               cfConfig.getConfirmationWindowSize(),

Added: branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java	                        (rev 0)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java	2010-08-20 22:30:33 UTC (rev 9575)
@@ -0,0 +1,1116 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.utils;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.GatheringByteChannel;
+import java.nio.channels.ScatteringByteChannel;
+import java.util.concurrent.Executor;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.client.impl.LargeMessageBufferImpl;
+import org.hornetq.core.client.impl.LargeMessageBufferInternal;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
+import org.jboss.netty.buffer.ChannelBuffer;
+
+/**
+ * A DecompressedHornetQBuffer
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class DecompressedLargeMessageBuffer implements LargeMessageBufferInternal
+{
+
+   // Constants -----------------------------------------------------
+
+   /**
+    * 
+    */
+   private static final String OPERATION_NOT_SUPPORTED = "Operation not supported";
+
+   private static final String READ_ONLY_ERROR_MESSAGE = "This is a read-only buffer, setOperations are not supported";
+
+   // Attributes ----------------------------------------------------
+
+
+   final LargeMessageBufferInternal bufferDelegate;
+   
+   final Executor threadPool;
+   
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public DecompressedLargeMessageBuffer(final LargeMessageBufferInternal bufferDelegate, final Executor threadPool)
+   {
+      this.bufferDelegate = bufferDelegate;
+      this.threadPool = threadPool;
+   }
+
+
+   // Public --------------------------------------------------------
+
+   /**
+    * 
+    */
+   public void discardUnusedPackets()
+   {
+      bufferDelegate.discardUnusedPackets();
+   }
+
+   /**
+    * Add a buff to the List, or save it to the OutputStream if set
+    * @param packet
+    */
+   public void addPacket(final SessionReceiveContinuationMessage packet)
+   {
+      bufferDelegate.addPacket(packet);
+   }
+
+   public synchronized void cancel()
+   {
+      bufferDelegate.cancel();
+   }
+
+   public synchronized void close()
+   {
+      bufferDelegate.cancel();
+   }
+
+   public void setOutputStream(final OutputStream output) throws HornetQException
+   {
+      try
+      {
+         PipedOutputStream pipeOut = new PipedOutputStream();
+         PipedInputStream pipeIn = new PipedInputStream();
+         
+         pipeOut.connect(pipeIn);
+         
+         GZipUtil.pipeGZip(pipeIn, false, threadPool);
+         
+         bufferDelegate.setOutputStream(pipeOut);
+      }
+      catch (IOException e)
+      {
+         throw new HornetQException(HornetQException.LARGE_MESSAGE_ERROR_BODY, e.getMessage(), e);
+      }
+   }
+
+   public synchronized void saveBuffer(final OutputStream output) throws HornetQException
+   {
+      setOutputStream(output);
+      waitCompletion(0);
+   }
+
+   /**
+    * 
+    * @param timeWait Milliseconds to Wait. 0 means forever
+    * @throws Exception
+    */
+   public synchronized boolean waitCompletion(final long timeWait) throws HornetQException
+   {
+      return bufferDelegate.waitCompletion(timeWait);
+   }
+
+   // Channel Buffer Implementation ---------------------------------
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#array()
+    */
+   public byte[] array()
+   {
+      throw new IllegalAccessError("array not supported on LargeMessageBufferImpl");
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#capacity()
+    */
+   public int capacity()
+   {
+      return -1;
+   }
+   
+   DataInputStream dataInput = null;
+   
+   private DataInputStream getStream()
+   {
+      if (dataInput == null)
+      {
+         try
+         {
+            InputStream input = new HornetQBufferInputStream(bufferDelegate);
+            
+            dataInput = new DataInputStream(GZipUtil.pipeGZip(input, false, threadPool));
+         }
+         catch (Exception e)
+         {
+            throw new RuntimeException (e.getMessage(), e);
+         }
+         
+      }
+      return dataInput;
+   }
+   
+   private void positioningNotSupported()
+   {
+      throw new IllegalStateException("Position not supported over compressed large messages");
+   }
+
+   public byte readByte()
+   {
+      try
+      {
+         return getStream().readByte();
+      }
+      catch (Exception e)
+      {
+         throw new RuntimeException (e.getMessage(), e);
+      }
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#getByte(int)
+    */
+   public byte getByte(final int index)
+   {
+      positioningNotSupported();
+      return 0;
+   }
+
+   private byte getByte(final long index)
+   {
+      positioningNotSupported();
+      return 0;
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#getBytes(int, org.hornetq.api.core.buffers.ChannelBuffer, int, int)
+    */
+   public void getBytes(final int index, final HornetQBuffer dst, final int dstIndex, final int length)
+   {
+      positioningNotSupported();
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#getBytes(int, org.hornetq.api.core.buffers.ChannelBuffer, int, int)
+    */
+   public void getBytes(final long index, final HornetQBuffer dst, final int dstIndex, final int length)
+   {
+      positioningNotSupported();
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#getBytes(int, byte[], int, int)
+    */
+   public void getBytes(final int index, final byte[] dst, final int dstIndex, final int length)
+   {
+      positioningNotSupported();
+   }
+
+   public void getBytes(final long index, final byte[] dst, final int dstIndex, final int length)
+   {
+      positioningNotSupported();
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#getBytes(int, java.nio.ByteBuffer)
+    */
+   public void getBytes(final int index, final ByteBuffer dst)
+   {
+      positioningNotSupported();
+   }
+
+   public void getBytes(final long index, final ByteBuffer dst)
+   {
+      positioningNotSupported();
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#getBytes(int, java.io.OutputStream, int)
+    */
+   public void getBytes(final int index, final OutputStream out, final int length) throws IOException
+   {
+      positioningNotSupported();
+   }
+
+   public void getBytes(final long index, final OutputStream out, final int length) throws IOException
+   {
+      positioningNotSupported();
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#getBytes(int, java.nio.channels.GatheringByteChannel, int)
+    */
+   public int getBytes(final int index, final GatheringByteChannel out, final int length) throws IOException
+   {
+      positioningNotSupported();
+      return 0;
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#getInt(int)
+    */
+   public int getInt(final int index)
+   {
+      positioningNotSupported();
+      return 0;
+   }
+
+   public int getInt(final long index)
+   {
+      positioningNotSupported();
+      return 0;
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#getLong(int)
+    */
+   public long getLong(final int index)
+   {
+      positioningNotSupported();
+      return 0;
+   }
+
+   public long getLong(final long index)
+   {
+      positioningNotSupported();
+      return 0;
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#getShort(int)
+    */
+   public short getShort(final int index)
+   {
+      positioningNotSupported();
+      return 0;
+   }
+
+   public short getShort(final long index)
+   {
+      return (short)(getByte(index) << 8 | getByte(index + 1) & 0xFF);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#getUnsignedMedium(int)
+    */
+   public int getUnsignedMedium(final int index)
+   {
+      positioningNotSupported();
+      return 0;
+   }
+   
+   
+
+   public int getUnsignedMedium(final long index)
+   {
+      positioningNotSupported();
+      return 0;
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#setByte(int, byte)
+    */
+   public void setByte(final int index, final byte value)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#setBytes(int, org.hornetq.api.core.buffers.ChannelBuffer, int, int)
+    */
+   public void setBytes(final int index, final HornetQBuffer src, final int srcIndex, final int length)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#setBytes(int, byte[], int, int)
+    */
+   public void setBytes(final int index, final byte[] src, final int srcIndex, final int length)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#setBytes(int, java.nio.ByteBuffer)
+    */
+   public void setBytes(final int index, final ByteBuffer src)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#setBytes(int, java.io.InputStream, int)
+    */
+   public int setBytes(final int index, final InputStream in, final int length) throws IOException
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#setBytes(int, java.nio.channels.ScatteringByteChannel, int)
+    */
+   public int setBytes(final int index, final ScatteringByteChannel in, final int length) throws IOException
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#setInt(int, int)
+    */
+   public void setInt(final int index, final int value)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#setLong(int, long)
+    */
+   public void setLong(final int index, final long value)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#setMedium(int, int)
+    */
+   public void setMedium(final int index, final int value)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#setShort(int, short)
+    */
+   public void setShort(final int index, final short value)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#toByteBuffer(int, int)
+    */
+   public ByteBuffer toByteBuffer(final int index, final int length)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#toString(int, int, java.lang.String)
+    */
+   public String toString(final int index, final int length, final String charsetName)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public int readerIndex()
+   {
+      // TODO
+      return 0;
+   }
+
+   public void readerIndex(final int readerIndex)
+   {
+      // TODO
+   }
+
+   public int writerIndex()
+   {
+      // TODO
+      return 0;
+   }
+
+   public long getSize()
+   {
+      // TODO
+      return 0;
+   }
+
+   public void writerIndex(final int writerIndex)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public void setIndex(final int readerIndex, final int writerIndex)
+   {
+      positioningNotSupported();
+   }
+
+   public void clear()
+   {
+   }
+
+   public boolean readable()
+   {
+      return true;
+   }
+
+   public boolean writable()
+   {
+      return false;
+   }
+
+   public int readableBytes()
+   {
+      return 1;
+   }
+
+   public int writableBytes()
+   {
+      return 0;
+   }
+
+   public void markReaderIndex()
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public void resetReaderIndex()
+   {
+      // TODO: reset positioning if possible
+   }
+
+   public void markWriterIndex()
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public void resetWriterIndex()
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public void discardReadBytes()
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public short getUnsignedByte(final int index)
+   {
+      return (short)(getByte(index) & 0xFF);
+   }
+
+   public int getUnsignedShort(final int index)
+   {
+      return getShort(index) & 0xFFFF;
+   }
+
+   public int getMedium(final int index)
+   {
+      int value = getUnsignedMedium(index);
+      if ((value & 0x800000) != 0)
+      {
+         value |= 0xff000000;
+      }
+      return value;
+   }
+
+   public long getUnsignedInt(final int index)
+   {
+      return getInt(index) & 0xFFFFFFFFL;
+   }
+
+   public void getBytes(int index, final byte[] dst)
+   {
+      // TODO: optimize this by using System.arraycopy
+      for (int i = 0; i < dst.length; i++)
+      {
+         dst[i] = getByte(index++);
+      }
+   }
+
+   public void getBytes(long index, final byte[] dst)
+   {
+      // TODO: optimize this by using System.arraycopy
+      for (int i = 0; i < dst.length; i++)
+      {
+         dst[i] = getByte(index++);
+      }
+   }
+
+   public void getBytes(final int index, final HornetQBuffer dst)
+   {
+      getBytes(index, dst, dst.writableBytes());
+   }
+
+   public void getBytes(final int index, final HornetQBuffer dst, final int length)
+   {
+      if (length > dst.writableBytes())
+      {
+         throw new IndexOutOfBoundsException();
+      }
+      getBytes(index, dst, dst.writerIndex(), length);
+      dst.writerIndex(dst.writerIndex() + length);
+   }
+
+   public void setBytes(final int index, final byte[] src)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public void setBytes(final int index, final HornetQBuffer src)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public void setBytes(final int index, final HornetQBuffer src, final int length)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public void setZero(final int index, final int length)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public short readUnsignedByte()
+   {
+      try
+      {
+         return (short)getStream().readUnsignedByte();
+      }
+      catch (Exception e)
+      {
+         throw new IllegalStateException (e.getMessage(), e);
+      }
+   }
+
+   public short readShort()
+   {
+      try
+      {
+         return (short)getStream().readShort();
+      }
+      catch (Exception e)
+      {
+         throw new IllegalStateException (e.getMessage(), e);
+      }
+   }
+
+   public int readUnsignedShort()
+   {
+      try
+      {
+         return (int)getStream().readUnsignedShort();
+      }
+      catch (Exception e)
+      {
+         throw new IllegalStateException (e.getMessage(), e);
+      }
+   }
+
+   public int readMedium()
+   {
+      int value = readUnsignedMedium();
+      if ((value & 0x800000) != 0)
+      {
+         value |= 0xff000000;
+      }
+      return value;
+   }
+
+   
+   public int readUnsignedMedium()
+   {
+      return (readByte() & 0xff) << 16 | (readByte() & 0xff) << 8 | (readByte() & 0xff) << 0;
+   }
+
+   public int readInt()
+   {
+      try
+      {
+         return getStream().readInt();
+      }
+      catch (Exception e)
+      {
+         throw new IllegalStateException(e.getMessage(), e);
+      }
+   }
+
+   public int readInt(final int pos)
+   {
+      positioningNotSupported();
+      return 0;
+   }
+
+   public long readUnsignedInt()
+   {
+      return readInt() & 0xFFFFFFFFL;
+   }
+
+   public long readLong()
+   {
+      try
+      {
+         return getStream().readLong();
+      }
+      catch (Exception e)
+      {
+         throw new IllegalStateException(e.getMessage(), e);
+      }
+   }
+
+   public void readBytes(final byte[] dst, final int dstIndex, final int length)
+   {
+      try
+      {
+         getStream().read(dst, dstIndex, length);
+      }
+      catch (Exception e)
+      {
+         throw new IllegalStateException(e.getMessage(), e);
+      }
+   }
+
+   public void readBytes(final byte[] dst)
+   {
+      readBytes(dst, 0, dst.length);
+   }
+
+   public void readBytes(final HornetQBuffer dst)
+   {
+      readBytes(dst, dst.writableBytes());
+   }
+
+   public void readBytes(final HornetQBuffer dst, final int length)
+   {
+      if (length > dst.writableBytes())
+      {
+         throw new IndexOutOfBoundsException();
+      }
+      readBytes(dst, dst.writerIndex(), length);
+      dst.writerIndex(dst.writerIndex() + length);
+   }
+
+   public void readBytes(final HornetQBuffer dst, final int dstIndex, final int length)
+   {
+      byte[] destBytes = new byte[length];
+      readBytes(destBytes);
+      dst.setBytes(dstIndex, destBytes);
+   }
+
+   public void readBytes(final ByteBuffer dst)
+   {
+      byte bytesToGet[] = new byte[dst.remaining()];
+      readBytes(bytesToGet);
+      dst.put(bytesToGet);
+   }
+
+   public int readBytes(final GatheringByteChannel out, final int length) throws IOException
+   {
+      throw new IllegalStateException("Not implemented!");
+   }
+
+   public void readBytes(final OutputStream out, final int length) throws IOException
+   {
+      throw new IllegalStateException("Not implemented!");
+   }
+
+   public void skipBytes(final int length)
+   {
+    
+      try
+      {
+         for (int i = 0 ; i < length; i++)
+         {
+            getStream().read();
+         }
+      }
+      catch (Exception e)
+      {
+         throw new IllegalStateException(e.getMessage(), e);
+      }
+   }
+
+   public void writeByte(final byte value)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public void writeShort(final short value)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public void writeMedium(final int value)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public void writeInt(final int value)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public void writeLong(final long value)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public void writeBytes(final byte[] src, final int srcIndex, final int length)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public void writeBytes(final byte[] src)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public void writeBytes(final HornetQBuffer src)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public void writeBytes(final HornetQBuffer src, final int length)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public void writeBytes(final ByteBuffer src)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public int writeBytes(final InputStream in, final int length) throws IOException
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public int writeBytes(final ScatteringByteChannel in, final int length) throws IOException
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public void writeZero(final int length)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public ByteBuffer toByteBuffer()
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public ByteBuffer[] toByteBuffers()
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public ByteBuffer[] toByteBuffers(final int index, final int length)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public String toString(final String charsetName)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public Object getUnderlyingBuffer()
+   {
+      return this;
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.spi.core.remoting.HornetQBuffer#readBoolean()
+    */
+   public boolean readBoolean()
+   {
+      return readByte() != 0;
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.spi.core.remoting.HornetQBuffer#readChar()
+    */
+   public char readChar()
+   {
+      return (char)readShort();
+   }
+   
+   public char getChar(final int index)
+   {
+      return (char)getShort(index);
+   }
+
+   public double getDouble(final int index)
+   {
+      return Double.longBitsToDouble(getLong(index));
+   }
+
+   public float getFloat(final int index)
+   {
+      return Float.intBitsToFloat(getInt(index));
+   }
+
+   public HornetQBuffer readBytes(final int length)
+   {
+      byte bytesToGet[] = new byte[length];
+      readBytes(bytesToGet);
+      return HornetQBuffers.wrappedBuffer(bytesToGet);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.spi.core.remoting.HornetQBuffer#readDouble()
+    */
+   public double readDouble()
+   {
+      return Double.longBitsToDouble(readLong());
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.spi.core.remoting.HornetQBuffer#readFloat()
+    */
+   public float readFloat()
+   {
+      return Float.intBitsToFloat(readInt());
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.spi.core.remoting.HornetQBuffer#readNullableSimpleString()
+    */
+   public SimpleString readNullableSimpleString()
+   {
+      int b = readByte();
+      if (b == DataConstants.NULL)
+      {
+         return null;
+      }
+      else
+      {
+         return readSimpleString();
+      }
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.spi.core.remoting.HornetQBuffer#readNullableString()
+    */
+   public String readNullableString()
+   {
+      int b = readByte();
+      if (b == DataConstants.NULL)
+      {
+         return null;
+      }
+      else
+      {
+         return readString();
+      }
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.spi.core.remoting.HornetQBuffer#readSimpleString()
+    */
+   public SimpleString readSimpleString()
+   {
+      int len = readInt();
+      byte[] data = new byte[len];
+      readBytes(data);
+      return new SimpleString(data);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.spi.core.remoting.HornetQBuffer#readString()
+    */
+   public String readString()
+   {
+      int len = readInt();
+
+      if (len < 9)
+      {
+         char[] chars = new char[len];
+         for (int i = 0; i < len; i++)
+         {
+            chars[i] = (char)readShort();
+         }
+         return new String(chars);
+      }
+      else if (len < 0xfff)
+      {
+         return readUTF();
+      }
+      else
+      {
+         return readSimpleString().toString();
+      }
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.spi.core.remoting.HornetQBuffer#readUTF()
+    */
+   public String readUTF()
+   {
+      return UTF8Util.readUTF(this);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeBoolean(boolean)
+    */
+   public void writeBoolean(final boolean val)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeChar(char)
+    */
+   public void writeChar(final char val)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeDouble(double)
+    */
+   public void writeDouble(final double val)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeFloat(float)
+    */
+   public void writeFloat(final float val)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeNullableSimpleString(org.hornetq.util.SimpleString)
+    */
+   public void writeNullableSimpleString(final SimpleString val)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeNullableString(java.lang.String)
+    */
+   public void writeNullableString(final String val)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeSimpleString(org.hornetq.util.SimpleString)
+    */
+   public void writeSimpleString(final SimpleString val)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeString(java.lang.String)
+    */
+   public void writeString(final String val)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeUTF(java.lang.String)
+    */
+   public void writeUTF(final String utf)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#compareTo(org.hornetq.api.core.buffers.ChannelBuffer)
+    */
+   public int compareTo(final HornetQBuffer buffer)
+   {
+      return -1;
+   }
+
+   public HornetQBuffer copy()
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public HornetQBuffer slice(final int index, final int length)
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   /**
+    * @param body
+    */
+   // Inner classes -------------------------------------------------
+
+   public ChannelBuffer channelBuffer()
+   {
+      return null;
+   }
+
+   public HornetQBuffer copy(final int index, final int length)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public HornetQBuffer duplicate()
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public HornetQBuffer readSlice(final int length)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public void setChar(final int index, final char value)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public void setDouble(final int index, final double value)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public void setFloat(final int index, final float value)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public HornetQBuffer slice()
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public void writeBytes(final HornetQBuffer src, final int srcIndex, final int length)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+}

Added: branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java	                        (rev 0)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java	2010-08-20 22:30:33 UTC (rev 9575)
@@ -0,0 +1,167 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.utils;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.concurrent.Executor;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.logging.Logger;
+
+/**
+ * A GZipUtil
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class GZipUtil
+{
+
+   private static final Logger log = Logger.getLogger(GZipUtil.class);
+
+   /**
+    * This will start a GZipOutputStream, using another thread through a Pipe
+    * TODO: We would need an inverted GZipInputStream (that would compress on reading) to avoid creating this thread (through an executor)
+    * @param inputStreamParameter
+    * @param compress = true if compressing, false if decompressing
+    * @return
+    * @throws HornetQException
+    */
+   public static InputStream pipeGZip(final InputStream inputStreamParameter, final boolean compress, final Executor threadPool) throws HornetQException
+   {
+      final InputStream input;
+      if (compress)
+      {
+         input = inputStreamParameter;
+      }
+      else
+      {
+         try
+         {
+            input = new GZIPInputStream(new BufferedInputStream(inputStreamParameter));
+         }
+         catch (IOException e)
+         {
+            throw new HornetQException(HornetQException.LARGE_MESSAGE_ERROR_BODY, e.getMessage(), e);
+         }
+      }
+      
+      final PipedOutputStream pipedOut = new PipedOutputStream();
+      final PipedInputStream pipedInput = new PipedInputStream();
+      try
+      {
+         pipedOut.connect(pipedInput);
+      }
+      catch (IOException e)
+      {
+         throw new HornetQException(HornetQException.LARGE_MESSAGE_ERROR_BODY, e.getMessage(), e);
+      }
+      
+      threadPool.execute(new Runnable()
+      {
+         
+         public void run()
+         {
+            byte readBytes[] = new byte[1024];
+            int size = 0;
+            
+            try
+            {
+               OutputStream out;
+               if (compress)
+               {
+                  BufferedOutputStream buffOut = new BufferedOutputStream(pipedOut);
+                  out = new GZIPOutputStream(buffOut);
+               }
+               else
+               {
+                  out = new BufferedOutputStream(pipedOut);
+               }
+               while ((size = input.read(readBytes)) > 0)
+               {
+                  System.out.println("Read " + size + " bytes on compressing thread");
+                  out.write(readBytes, 0, size);
+               }
+               System.out.println("Finished compressing");
+               out.close();
+            }
+            catch (Exception e)
+            {
+               log.warn(e.getMessage());
+               try
+               {
+                  pipedOut.close();
+               }
+               catch (Exception ignored)
+               {
+               }
+            }
+            
+         }
+      });
+      
+      return pipedInput;
+   }
+
+   public static void deZip(final InputStream input, final OutputStream output, final Executor threadPool) throws HornetQException
+   {
+      threadPool.execute(new Runnable()
+      {
+         
+         public void run()
+         {
+            byte readBytes[] = new byte[1024];
+            int size = 0;
+
+            OutputStream out = null;
+            
+            try
+            {
+               BufferedOutputStream buffOut = new BufferedOutputStream(output);
+               out = new GZIPOutputStream(buffOut);
+               while ((size = input.read(readBytes)) > 0)
+               {
+                  System.out.println("Read " + size + " bytes on compressing thread");
+                  out.write(readBytes, 0, size);
+               }
+               System.out.println("Finished compressing");
+               out.close();
+            }
+            catch (Exception e)
+            {
+               log.warn(e.getMessage());
+               try
+               {
+                  out.close();
+               }
+               catch (Exception ignored)
+               {
+               }
+            }
+            
+         }
+      });
+   }
+
+
+}

Added: branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/HornetQBufferInputStream.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/HornetQBufferInputStream.java	                        (rev 0)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/HornetQBufferInputStream.java	2010-08-20 22:30:33 UTC (rev 9575)
@@ -0,0 +1,182 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.hornetq.api.core.HornetQBuffer;
+
+/**
+ * Used to send large messages
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class HornetQBufferInputStream extends InputStream
+{
+
+   /* (non-Javadoc)
+    * @see java.io.InputStream#read()
+    */
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+   private HornetQBuffer bb;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public HornetQBufferInputStream(final HornetQBuffer paramByteBuffer)
+   {
+      bb = paramByteBuffer;
+   }
+
+   @Override
+   public int read() throws IOException
+   {
+      if (bb == null)
+      {
+         throw new IOException("read on a closed InputStream");
+      }
+
+      if (remainingBytes() == 0)
+      {
+         return -1;
+      }
+      else
+      {
+         return bb.readByte();
+      }
+   }
+
+   @Override
+   public int read(final byte[] byteArray) throws IOException
+   {
+      if (bb == null)
+      {
+         throw new IOException("read on a closed InputStream");
+      }
+
+      return read(byteArray, 0, byteArray.length);
+   }
+
+   @Override
+   public int read(final byte[] byteArray, final int off, final int len) throws IOException
+   {
+      if (bb == null)
+      {
+         throw new IOException("read on a closed InputStream");
+      }
+
+      if (byteArray == null)
+      {
+         throw new NullPointerException();
+      }
+      if (off < 0 || off > byteArray.length || len < 0 || off + len > byteArray.length || off + len < 0)
+      {
+         throw new IndexOutOfBoundsException();
+      }
+      if (len == 0)
+      {
+         return 0;
+      }
+
+      int size = Math.min(remainingBytes(), len);
+      
+      if (size == 0)
+      {
+         return -1;
+      }
+
+      bb.readBytes(byteArray, off, size);
+      return size;
+   }
+
+   @Override
+   public long skip(final long len) throws IOException
+   {
+      if (bb == null)
+      {
+         throw new IOException("skip on a closed InputStream");
+      }
+
+      if (len <= 0L)
+      {
+         return 0L;
+      }
+
+      int size = Math.min(remainingBytes(), (int) len);
+
+      bb.skipBytes((int)size);
+
+      return size;
+   }
+
+   @Override
+   public int available() throws IOException
+   {
+      if (bb == null)
+      {
+         throw new IOException("available on a closed InputStream");
+      }
+
+      return remainingBytes();
+   }
+
+   @Override
+   public void close() throws IOException
+   {
+      bb = null;
+   }
+
+   @Override
+   public synchronized void mark(final int paramInt)
+   {
+   }
+
+   @Override
+   public synchronized void reset() throws IOException
+   {
+      throw new IOException("mark/reset not supported");
+   }
+
+   @Override
+   public boolean markSupported()
+   {
+      return false;
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   /**
+    * @return
+    */
+   private int remainingBytes()
+   {
+      return bb.writerIndex() - bb.readerIndex();
+   }
+
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: branches/Branch_Large_Message_Compression/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml
===================================================================
--- branches/Branch_Large_Message_Compression/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml	2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml	2010-08-20 22:30:33 UTC (rev 9575)
@@ -20,6 +20,7 @@
       <producer-window-size>7712652</producer-window-size>
       <producer-max-rate>789</producer-max-rate>
       <min-large-message-size>12</min-large-message-size>
+      <compress-large-messages>true</compress-large-messages>
       <client-id>TestClientID</client-id>
       <dups-ok-batch-size>3456</dups-ok-batch-size>
       <transaction-batch-size>4567</transaction-batch-size>

Modified: branches/Branch_Large_Message_Compression/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java	2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java	2010-08-20 22:30:33 UTC (rev 9575)
@@ -72,6 +72,7 @@
                                                        HornetQClient.DEFAULT_CALL_TIMEOUT,
                                                        HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
                                                        HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+                                                       HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
                                                        HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
                                                        HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
                                                        HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,

Modified: branches/Branch_Large_Message_Compression/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java	2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java	2010-08-20 22:30:33 UTC (rev 9575)
@@ -68,6 +68,7 @@
                                                     HornetQClient.DEFAULT_CALL_TIMEOUT,
                                                     HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
                                                     HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+                                                    HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
                                                     HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
                                                     HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
                                                     HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,

Modified: branches/Branch_Large_Message_Compression/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java	2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java	2010-08-20 22:30:33 UTC (rev 9575)
@@ -298,6 +298,7 @@
                                                     HornetQClient.DEFAULT_CALL_TIMEOUT,
                                                     HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
                                                     HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+                                                    HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
                                                     prefetchSize,
                                                     HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
                                                     HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,

Added: branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/CompressedLargeMessageTest.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/CompressedLargeMessageTest.java	                        (rev 0)
+++ branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/CompressedLargeMessageTest.java	2010-08-20 22:30:33 UTC (rev 9575)
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+
+/**
+ * A CompressedLargeMessageTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class CompressedLargeMessageTest extends LargeMessageTest
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   protected ClientSessionFactoryImpl createFactory(final boolean isNetty)
+   {
+      ClientSessionFactoryImpl factory = super.createFactory(isNetty);
+      factory.setCompressLargeMessages(true);
+      return factory;
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2010-08-20 22:30:33 UTC (rev 9575)
@@ -151,7 +151,7 @@
    public void doTestLargeBuffer(boolean transacted) throws Exception
    {
       final int journalsize = 100 * 1024;
-      final int messageSize = 3 * journalsize;
+      final int messageSize = 3 * journalsize + 5;
       // final int messageSize = 5 * 1024;
 
       ClientSession session = null;
@@ -169,6 +169,8 @@
          server.start();
 
          ClientSessionFactory sf = createFactory(isNetty());
+         
+         sf.setCompressLargeMessages(true);
 
          session = sf.createSession(!transacted, !transacted, 0);
 

Modified: branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java	2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java	2010-08-20 22:30:33 UTC (rev 9575)
@@ -136,6 +136,7 @@
                                             callTimeout,
                                             HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
                                             HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+                                            HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
                                             HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
                                             HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
                                             HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,

Modified: branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java	2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java	2010-08-20 22:30:33 UTC (rev 9575)
@@ -206,6 +206,7 @@
                                         callTimeout,
                                         HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
                                         HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+                                        HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
                                         HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
                                         HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
                                         HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,

Modified: branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/ReSendMessageTest.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/ReSendMessageTest.java	2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/ReSendMessageTest.java	2010-08-20 22:30:33 UTC (rev 9575)
@@ -305,6 +305,7 @@
                                         callTimeout,
                                         true,
                                         HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+                                        HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
                                         HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
                                         HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
                                         HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,

Modified: branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java	2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java	2010-08-20 22:30:33 UTC (rev 9575)
@@ -74,6 +74,7 @@
                                         HornetQClient.DEFAULT_CALL_TIMEOUT,
                                         HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
                                         HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+                                        HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
                                         HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
                                         HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
                                         HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,

Modified: branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java	2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java	2010-08-20 22:30:33 UTC (rev 9575)
@@ -241,6 +241,7 @@
                                         callTimeout,
                                         true,
                                         HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+                                        HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
                                         HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
                                         HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
                                         HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,

Modified: branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java	2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java	2010-08-20 22:30:33 UTC (rev 9575)
@@ -152,6 +152,7 @@
                                         callTimeout,
                                         HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
                                         HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+                                        HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
                                         HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
                                         HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
                                         HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,

Modified: branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/server/config/JMSServerConfigParserTest.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/server/config/JMSServerConfigParserTest.java	2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/server/config/JMSServerConfigParserTest.java	2010-08-20 22:30:33 UTC (rev 9575)
@@ -74,6 +74,7 @@
       assertEquals(7712652, cfConfig.getProducerWindowSize());
       assertEquals(789, cfConfig.getProducerMaxRate());
       assertEquals(12, cfConfig.getMinLargeMessageSize());
+      assertEquals(true, cfConfig.isCompressLargeMessages());
       assertEquals("TestClientID", cfConfig.getClientID());
       assertEquals(3456, cfConfig.getDupsOKBatchSize());
       assertEquals(4567, cfConfig.getTransactionBatchSize());

Modified: branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java	2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java	2010-08-20 22:30:33 UTC (rev 9575)
@@ -15,6 +15,7 @@
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
@@ -35,12 +36,14 @@
 import org.hornetq.api.core.client.MessageHandler;
 import org.hornetq.core.client.impl.ClientConsumerInternal;
 import org.hornetq.core.client.impl.ClientMessageInternal;
+import org.hornetq.core.client.impl.ClientSessionInternal;
 import org.hornetq.core.client.impl.LargeMessageBufferImpl;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
 import org.hornetq.tests.util.RandomUtil;
 import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.HornetQBufferInputStream;
 
 /**
  * A LargeMessageBufferUnitTest
@@ -56,7 +59,7 @@
 
    // Attributes ----------------------------------------------------
 
-   static int tmpFileCounter = 0; 
+   static int tmpFileCounter = 0;
 
    // Static --------------------------------------------------------
 
@@ -67,13 +70,13 @@
    protected void setUp() throws Exception
    {
       super.setUp();
-      
+
       tmpFileCounter++;
 
       File tmp = new File(getTestDir());
       tmp.mkdirs();
    }
-   
+
    protected void tearDown() throws Exception
    {
       super.tearDown();
@@ -166,6 +169,20 @@
       }
    }
 
+   public void testReadIntegersOverStream() throws Exception
+   {
+      LargeMessageBufferImpl buffer = createBufferWithIntegers(3, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15);
+      HornetQBufferInputStream is = new HornetQBufferInputStream(buffer);
+      DataInputStream dataInput = new DataInputStream(is);
+
+      for (int i = 1; i <= 15; i++)
+      {
+         Assert.assertEquals(i, dataInput.readInt());
+      }
+
+      assertEquals(-1, dataInput.read());
+   }
+
    // testing void getBytes(int index, ChannelBuffer dst, int dstIndex, int length)
    public void testReadLongs() throws Exception
    {
@@ -186,6 +203,20 @@
       }
    }
 
+   public void testReadLongsOverStream() throws Exception
+   {
+      LargeMessageBufferImpl buffer = createBufferWithLongs(3, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15);
+      HornetQBufferInputStream is = new HornetQBufferInputStream(buffer);
+      DataInputStream dataInput = new DataInputStream(is);
+
+      for (int i = 1; i <= 15; i++)
+      {
+         Assert.assertEquals(i, dataInput.readLong());
+      }
+
+      assertEquals(-1, dataInput.read());
+   }
+
    public void testReadData() throws Exception
    {
       HornetQBuffer dynamic = HornetQBuffers.dynamicBuffer(1);
@@ -315,14 +346,14 @@
          Assert.assertEquals(i, bytes[i]);
       }
    }
-   
+
    public void testSplitBufferOnFile() throws Exception
    {
       LargeMessageBufferImpl outBuffer = new LargeMessageBufferImpl(new FakeConsumerInternal(),
-                                                                          1024 * 1024,
-                                                                          1,
-                                                                          getTestFile(),
-                                                                          1024);
+                                                                    1024 * 1024,
+                                                                    1,
+                                                                    getTestFile(),
+                                                                    1024);
       try
       {
 
@@ -525,6 +556,36 @@
 
    }
 
+   public void testReadBytesOnStreaming() throws Exception
+   {
+      byte[] byteArray = new byte[1024];
+      for (int i = 0; i < byteArray.length; i++)
+      {
+         byteArray[i] = getSamplebyte(i);
+      }
+
+      HornetQBuffer splitbuffer = splitBuffer(3, byteArray);
+
+      HornetQBufferInputStream is = new HornetQBufferInputStream(splitbuffer);
+
+      for (int i = 0; i < 100; i++)
+      {
+         assertEquals(getSamplebyte(i), (byte)is.read());
+      }
+
+      for (int i = 100; i < byteArray.length; i += 10)
+      {
+         byte readBytes[] = new byte[10];
+         
+         int size = is.read(readBytes);
+         
+         for (int j = 0; j < size; j++)
+         {
+            assertEquals(getSamplebyte(i + j), readBytes[j]);
+         }
+      }
+   }
+
    /**
     * @return
     */
@@ -795,6 +856,15 @@
          return null;
       }
 
+      /* (non-Javadoc)
+       * @see org.hornetq.core.client.impl.ClientConsumerInternal#getSession()
+       */
+      public ClientSessionInternal getSession()
+      {
+         // TODO Auto-generated method stub
+         return null;
+      }
+
    }
 
 }

Added: branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/unit/util/HornetQBufferInputStreamTest.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/unit/util/HornetQBufferInputStreamTest.java	                        (rev 0)
+++ branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/unit/util/HornetQBufferInputStreamTest.java	2010-08-20 22:30:33 UTC (rev 9575)
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.unit.util;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.HornetQBufferInputStream;
+
+/**
+ * A HornetQInputStreamTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class HornetQBufferInputStreamTest extends UnitTestCase
+{
+
+   
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testReadBytes() throws Exception
+   {
+      byte bytes[] = new byte[10*1024];
+      for (int i = 0 ; i < bytes.length; i++)
+      {
+         bytes[i] = getSamplebyte(i);
+      }
+      
+      HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+      HornetQBufferInputStream is = new HornetQBufferInputStream(buffer);
+      
+      // First read byte per byte
+      for (int i = 0 ; i < 1024; i++)
+      {
+         assertEquals(getSamplebyte(i), is.read());
+      }
+      
+      // Second, read in chunks
+      for (int i = 1; i < 10; i++)
+      {
+         bytes = new byte[1024];
+         is.read(bytes);
+         for (int j = 0 ; j < bytes.length; j++)
+         {
+            assertEquals(getSamplebyte(i * 1024 + j), bytes[j]);
+         }
+         
+      }
+      
+      assertEquals(-1, is.read());
+      
+      
+      bytes = new byte[1024];
+      
+      int sizeRead = is.read(bytes);
+      
+      assertEquals(-1, sizeRead);
+      
+   }
+   
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/util/JMSTestBase.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/util/JMSTestBase.java	2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/util/JMSTestBase.java	2010-08-20 22:30:33 UTC (rev 9575)
@@ -190,6 +190,7 @@
                                         callTimeout,
                                         HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
                                         HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+                                        HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
                                         HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
                                         HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
                                         HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,



More information about the hornetq-commits mailing list