[hornetq-commits] JBoss hornetq SVN: r9993 - in trunk: src/main/org/hornetq/api/core and 18 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Sun Dec 5 05:00:14 EST 2010


Author: gaohoward
Date: 2010-12-05 05:00:09 -0500 (Sun, 05 Dec 2010)
New Revision: 9993

Added:
   trunk/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java
   trunk/src/main/org/hornetq/utils/DeflaterReader.java
   trunk/src/main/org/hornetq/utils/HornetQBufferInputStream.java
   trunk/src/main/org/hornetq/utils/InflaterReader.java
   trunk/src/main/org/hornetq/utils/InflaterWriter.java
   trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java
   trunk/tests/src/org/hornetq/tests/unit/util/CompressionUtilTest.java
   trunk/tests/src/org/hornetq/tests/unit/util/HornetQBufferInputStreamTest.java
Modified:
   trunk/src/config/common/schema/hornetq-jms.xsd
   trunk/src/main/org/hornetq/api/core/Message.java
   trunk/src/main/org/hornetq/api/core/client/ClientSessionFactory.java
   trunk/src/main/org/hornetq/api/core/client/HornetQClient.java
   trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java
   trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
   trunk/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java
   trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
   trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
   trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
   trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
   trunk/src/main/org/hornetq/core/client/impl/FailoverManager.java
   trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java
   trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferInternal.java
   trunk/src/main/org/hornetq/jms/server/JMSServerManager.java
   trunk/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java
   trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
   trunk/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java
   trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
   trunk/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml
   trunk/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java
   trunk/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java
   trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java
   trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
   trunk/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java
   trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java
   trunk/tests/src/org/hornetq/tests/integration/jms/client/ReSendMessageTest.java
   trunk/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java
   trunk/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java
   trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java
   trunk/tests/src/org/hornetq/tests/integration/jms/server/config/JMSServerConfigParserTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java
   trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java
   trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:

HORNETQ-448

Large Message Compression Impl


Modified: trunk/src/config/common/schema/hornetq-jms.xsd
===================================================================
--- trunk/src/config/common/schema/hornetq-jms.xsd	2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/config/common/schema/hornetq-jms.xsd	2010-12-05 10:00:09 UTC (rev 9993)
@@ -75,7 +75,11 @@
             </xsd:element>
    			<xsd:element name="min-large-message-size" type="xsd:long"
    				maxOccurs="1" minOccurs="0">
-   			</xsd:element>   			          
+   			</xsd:element>   			 
+   			<xsd:element name="compress-large-messages" type="xsd:boolean"
+                maxOccurs="1" minOccurs="0">
+            </xsd:element>
+   			         
             <xsd:element name="client-id" type="xsd:string"
                 maxOccurs="1" minOccurs="0">
             </xsd:element>            

Modified: trunk/src/main/org/hornetq/api/core/Message.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/Message.java	2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/api/core/Message.java	2010-12-05 10:00:09 UTC (rev 9993)
@@ -65,6 +65,8 @@
    public static final SimpleString HDR_ORIG_MESSAGE_ID = new SimpleString("_HQ_ORIG_MESSAGE_ID");
 
    public static final SimpleString HDR_GROUP_ID = new SimpleString("_HQ_GROUP_ID");
+   
+   public static final SimpleString HDR_LARGE_COMPRESSED = new SimpleString("_HQ_LARGE_COMPRESSED");
 
    public static final SimpleString HDR_SCHEDULED_DELIVERY_TIME = new SimpleString("_HQ_SCHED_DELIVERY");
 

Modified: trunk/src/main/org/hornetq/api/core/client/ClientSessionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/client/ClientSessionFactory.java	2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/api/core/client/ClientSessionFactory.java	2010-12-05 10:00:09 UTC (rev 9993)
@@ -138,4 +138,7 @@
    
    CoreRemotingConnection getConnection();
 
+   void setCompressLargeMessages(boolean compressLargeMessage);
+   
+   boolean isCompressLargeMessages();
 }

Modified: trunk/src/main/org/hornetq/api/core/client/HornetQClient.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/client/HornetQClient.java	2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/api/core/client/HornetQClient.java	2010-12-05 10:00:09 UTC (rev 9993)
@@ -43,6 +43,8 @@
    // Any message beyond this size is considered a large message (to be sent in chunks)
    
    public static final int DEFAULT_MIN_LARGE_MESSAGE_SIZE = 100 * 1024;
+   
+   public static final boolean DEFAULT_COMPRESS_LARGE_MESSAGES = false;
 
    public static final int DEFAULT_CONSUMER_WINDOW_SIZE = 1024 * 1024;
 

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2010-12-05 10:00:09 UTC (rev 9993)
@@ -29,6 +29,7 @@
 import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
+import org.hornetq.utils.DecompressedLargeMessageBuffer;
 import org.hornetq.utils.Future;
 import org.hornetq.utils.PriorityLinkedList;
 import org.hornetq.utils.PriorityLinkedListImpl;
@@ -454,6 +455,11 @@
    // ClientConsumerInternal implementation
    // --------------------------------------------------------------
 
+   public ClientSessionInternal getSession()
+   {
+      return session;
+   }
+   
    public SessionQueueQueryResponseMessage getQueueInfo()
    {
       return queueInfo;
@@ -554,7 +560,14 @@
 
       currentLargeMessageBuffer = new LargeMessageBufferImpl(this, packet.getLargeMessageSize(), 60, largeMessageCache);
 
-      currentChunkMessage.setBuffer(currentLargeMessageBuffer);
+      if (currentChunkMessage.isCompressed())
+      {
+         currentChunkMessage.setBuffer(new DecompressedLargeMessageBuffer(currentLargeMessageBuffer));
+      }
+      else
+      {
+         currentChunkMessage.setBuffer(currentLargeMessageBuffer);
+      }
 
       currentChunkMessage.setFlowControlSize(0);
 

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java	2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java	2010-12-05 10:00:09 UTC (rev 9993)
@@ -67,4 +67,6 @@
    void start();
    
    SessionQueueQueryResponseMessage getQueueInfo();
+   
+   ClientSessionInternal getSession();
 }

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java	2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java	2010-12-05 10:00:09 UTC (rev 9993)
@@ -21,11 +21,11 @@
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQBuffers;
 import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Message;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.message.BodyEncoder;
 import org.hornetq.core.message.impl.MessageImpl;
-import org.hornetq.utils.DataConstants;
 
 /**
  * 
@@ -117,6 +117,11 @@
    {
       return largeMessage;
    }
+   
+   public boolean isCompressed()
+   {
+      return properties.getBooleanProperty(Message.HDR_LARGE_COMPRESSED);
+   }
 
    /**
     * @param largeMessage the largeMessage to set
@@ -142,7 +147,6 @@
              "]";
    }
 
-   // FIXME - only used for large messages - move it!
    /* (non-Javadoc)
     * @see org.hornetq.api.core.client.ClientMessage#saveToOutputStream(java.io.OutputStream)
     */
@@ -150,7 +154,7 @@
    {
       if (largeMessage)
       {
-         ((LargeMessageBufferInternal)getWholeBuffer()).saveBuffer(out);
+          ((LargeMessageBufferInternal)getWholeBuffer()).saveBuffer(out);
       }
       else
       {

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java	2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java	2010-12-05 10:00:09 UTC (rev 9993)
@@ -44,4 +44,6 @@
    void discardLargeBody();
 
    void setBuffer(HornetQBuffer buffer);
+   
+   boolean isCompressed();
 }

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java	2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java	2010-12-05 10:00:09 UTC (rev 9993)
@@ -22,12 +22,13 @@
 import org.hornetq.api.core.Message;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.logging.Logger;
-import org.hornetq.core.message.BodyEncoder;
 import org.hornetq.core.message.impl.MessageInternal;
 import org.hornetq.core.protocol.core.Channel;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
+import org.hornetq.utils.DeflaterReader;
+import org.hornetq.utils.HornetQBufferInputStream;
 import org.hornetq.utils.TokenBucketLimiter;
 import org.hornetq.utils.UUIDGenerator;
 
@@ -150,7 +151,7 @@
       {
          return;
       }
-            
+
       doCleanup();
    }
 
@@ -190,7 +191,7 @@
    {
       return credits;
    }
-   
+
    // Protected ------------------------------------------------------------------------------------
 
    // Package Private ------------------------------------------------------------------------------
@@ -203,7 +204,7 @@
       {
          session.returnCredits(address);
       }
-      
+
       session.removeProducer(this);
 
       closed = true;
@@ -212,12 +213,13 @@
    private void doSend(final SimpleString address, final Message msg) throws HornetQException
    {
       MessageInternal msgI = (MessageInternal)msg;
-      
+
       ClientProducerCredits theCredits;
-      
+
       boolean isLarge;
 
-      if (msgI.getBodyInputStream() != null || msgI.isLargeMessage() || msgI.getBodyBuffer().writerIndex() > minLargeMessageSize)
+      if (msgI.getBodyInputStream() != null || msgI.isLargeMessage() ||
+          msgI.getBodyBuffer().writerIndex() > minLargeMessageSize)
       {
          isLarge = true;
       }
@@ -236,7 +238,7 @@
          {
             msg.setAddress(address);
          }
-         
+
          // Anonymous
          theCredits = session.getCredits(address, true);
       }
@@ -250,7 +252,7 @@
          {
             msg.setAddress(this.address);
          }
-         
+
          theCredits = credits;
       }
 
@@ -270,8 +272,6 @@
 
       session.workDone();
 
-      
-
       if (isLarge)
       {
          largeMessageSend(sendBlocking, msgI, theCredits);
@@ -322,8 +322,16 @@
     * @param msgI
     * @throws HornetQException
     */
-   private void largeMessageSend(final boolean sendBlocking, final MessageInternal msgI, final ClientProducerCredits credits) throws HornetQException
+   private void largeMessageSend(final boolean sendBlocking,
+                                 final MessageInternal msgI,
+                                 final ClientProducerCredits credits) throws HornetQException
    {
+
+      if (session.isCompressLargeMessages())
+      {
+         msgI.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, true);
+      }
+
       int headerSize = msgI.getHeadersAndPropertiesEncodeSize();
 
       if (headerSize >= minLargeMessageSize)
@@ -341,7 +349,6 @@
       HornetQBuffer headerBuffer = HornetQBuffers.fixedBuffer(headerSize);
 
       msgI.encodeHeadersAndProperties(headerBuffer);
-
       SessionSendLargeMessage initialChunk = new SessionSendLargeMessage(headerBuffer.toByteBuffer().array());
 
       channel.send(initialChunk);
@@ -358,7 +365,7 @@
 
       if (input != null)
       {
-         largeMessageSendStreamed(sendBlocking, input, credits);
+         largeMessageSendStreamed(sendBlocking, msgI, input, credits);
       }
       else
       {
@@ -375,72 +382,29 @@
                                          final MessageInternal msgI,
                                          final ClientProducerCredits credits) throws HornetQException
    {
-      BodyEncoder context = msgI.getBodyEncoder();
-
-      final long bodySize = context.getLargeBodySize();
-
-      context.open();
-      try
-      {
-
-         for (int pos = 0; pos < bodySize;)
-         {
-            final boolean lastChunk;
-
-            final int chunkLength = Math.min((int)(bodySize - pos), minLargeMessageSize);
-
-            final HornetQBuffer bodyBuffer = HornetQBuffers.fixedBuffer(chunkLength);
-
-            context.encode(bodyBuffer, chunkLength);
-
-            pos += chunkLength;
-
-            lastChunk = pos >= bodySize;
-
-            final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(bodyBuffer.toByteBuffer()
-                                                                                                      .array(),
-                                                                                            !lastChunk,
-                                                                                            lastChunk && sendBlocking);
-
-            if (sendBlocking && lastChunk)
-            {
-               // When sending it blocking, only the last chunk will be blocking.
-               channel.sendBlocking(chunk);
-            }
-            else
-            {
-               channel.send(chunk);
-            }
-
-            try
-            {
-               credits.acquireCredits(chunk.getPacketSize());
-            }
-            catch (InterruptedException e)
-            {
-            }
-         }
-      }
-      finally
-      {
-         context.close();
-      }
+      msgI.getBodyBuffer().readerIndex(0);
+      largeMessageSendStreamed(sendBlocking, msgI, new HornetQBufferInputStream(msgI.getBodyBuffer()), credits);
    }
 
    /**
-    * TODO: This method could be eliminated and 
-    *       combined with {@link ClientProducerImpl#largeMessageSendBuffered(boolean, Message, ClientProducerCredits)}. 
-    *       All that's needed for this is ClientMessage returning the proper BodyEncoder for streamed
     * @param sendBlocking
     * @param input
     * @throws HornetQException
     */
    private void largeMessageSendStreamed(final boolean sendBlocking,
-                                         final InputStream input,
+                                         final MessageInternal msgI,
+                                         final InputStream inputStreamParameter,
                                          final ClientProducerCredits credits) throws HornetQException
    {
       boolean lastPacket = false;
 
+      InputStream input = inputStreamParameter;
+
+      if (session.isCompressLargeMessages())
+      {
+         input = new DeflaterReader(inputStreamParameter);
+      }
+
       while (!lastPacket)
       {
          byte[] buff = new byte[minLargeMessageSize];

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2010-12-05 10:00:09 UTC (rev 9993)
@@ -30,6 +30,7 @@
 
 import org.hornetq.api.core.*;
 import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.api.core.client.SessionFailureListener;
 import org.hornetq.core.logging.Logger;
@@ -146,6 +147,8 @@
    public final Exception e = new Exception();
 
    private final Object waitLock = new Object();
+   
+   private boolean compressLargeMessages;
 
    // Static
    // ---------------------------------------------------------------------------------------
@@ -202,6 +205,8 @@
       closeExecutor = orderedExecutorFactory.getExecutor();
 
       this.interceptors = interceptors;
+      
+      compressLargeMessages = HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;
 
    }
 
@@ -768,6 +773,7 @@
                                                                      serverLocator.isBlockOnDurableSend(),
                                                                      serverLocator.isCacheLargeMessagesClient(),
                                                                      serverLocator.getMinLargeMessageSize(),
+                                                                     compressLargeMessages,
                                                                      serverLocator.getInitialMessagePacketSize(),
                                                                      serverLocator.getGroupID(),
                                                                      connection,
@@ -1358,4 +1364,14 @@
          cancelled = true;
       }
    }
+
+   public void setCompressLargeMessages(boolean compressLargeMessage)
+   {
+      this.compressLargeMessages = compressLargeMessage;
+   }
+
+   public boolean isCompressLargeMessages()
+   {
+      return this.compressLargeMessages;
+   }
 }

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2010-12-05 10:00:09 UTC (rev 9993)
@@ -155,6 +155,8 @@
    private final boolean blockOnDurableSend;
 
    private final int minLargeMessageSize;
+   
+   private final boolean compressLargeMessages;
 
    private volatile int initialMessagePacketSize;
 
@@ -207,6 +209,7 @@
                             final boolean blockOnDurableSend,
                             final boolean cacheLargeMessageClient,
                             final int minLargeMessageSize,
+                            final boolean compressLargeMessages,
                             final int initialMessagePacketSize,
                             final String groupID,
                             final CoreRemotingConnection remotingConnection,
@@ -259,6 +262,8 @@
       this.cacheLargeMessageClient = cacheLargeMessageClient;
 
       this.minLargeMessageSize = minLargeMessageSize;
+      
+      this.compressLargeMessages = compressLargeMessages;
 
       this.initialMessagePacketSize = initialMessagePacketSize;
 
@@ -269,7 +274,7 @@
 
    // ClientSession implementation
    // -----------------------------------------------------------------
-
+   
    public void createQueue(final SimpleString address, final SimpleString queueName) throws HornetQException
    {
       internalCreateQueue(address, queueName, null, false, false);
@@ -684,6 +689,11 @@
    {
       return minLargeMessageSize;
    }
+   
+   public boolean isCompressLargeMessages()
+   {
+      return compressLargeMessages;
+   }
 
    /**
     * @return the cacheLargeMessageClient

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java	2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java	2010-12-05 10:00:09 UTC (rev 9993)
@@ -39,6 +39,8 @@
    boolean isCacheLargeMessageClient();
 
    int getMinLargeMessageSize();
+   
+   boolean isCompressLargeMessages();
 
    void expire(long consumerID, long messageID) throws HornetQException;
 

Modified: trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java	2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java	2010-12-05 10:00:09 UTC (rev 9993)
@@ -561,4 +561,9 @@
    {
       session.addMetaData(key, data);
    }
+
+   public boolean isCompressLargeMessages()
+   {
+      return session.isCompressLargeMessages();
+   }
 }

Modified: trunk/src/main/org/hornetq/core/client/impl/FailoverManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/FailoverManager.java	2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/core/client/impl/FailoverManager.java	2010-12-05 10:00:09 UTC (rev 9993)
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.client.impl;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.SessionFailureListener;
+import org.hornetq.core.protocol.core.CoreRemotingConnection;
+
+/**
+ * A ConnectionManager
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 27 Nov 2008 18:45:46
+ *
+ *
+ */
+public interface FailoverManager
+{
+   ClientSession createSession(final String username,
+                               final String password,
+                               final boolean xa,
+                               final boolean autoCommitSends,
+                               final boolean autoCommitAcks,
+                               final boolean preAcknowledge,
+                               final int ackBatchSize,
+                               final boolean cacheLargeMessageClient,
+                               final int minLargeMessageSize,
+                               final boolean compressLargeMessages,
+                               final boolean blockOnAcknowledge,
+                               final boolean autoGroup,
+                               final int confirmationWindowSize,
+                               final int producerWindowSize,
+                               final int consumerWindowSize,
+                               final int producerMaxRate,
+                               final int consumerMaxRate,
+                               final boolean blockOnNonDurableSend,
+                               final boolean blockOnDurableSend,
+                               final int initialMessagePacketSize,
+                               final String groupID) throws HornetQException;
+
+   void removeSession(final ClientSessionInternal session);
+
+   public CoreRemotingConnection getConnection();
+
+   int numConnections();
+
+   int numSessions();
+
+   void addFailureListener(SessionFailureListener listener);
+
+   boolean removeFailureListener(SessionFailureListener listener);
+
+   void causeExit();
+
+}

Modified: trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java	2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java	2010-12-05 10:00:09 UTC (rev 9993)
@@ -287,7 +287,6 @@
     */
    public synchronized boolean waitCompletion(final long timeWait) throws HornetQException
    {
-
       if (outStream == null)
       {
          // There is no stream.. it will never achieve the end of streaming
@@ -1258,11 +1257,12 @@
    {
       try
       {
+         output.write(packet.getBody());
          if (!packet.isContinues())
          {
             streamEnded = true;
+            output.close();
          }
-         output.write(packet.getBody());
       }
       catch (IOException e)
       {

Modified: trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferInternal.java	2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferInternal.java	2010-12-05 10:00:09 UTC (rev 9993)
@@ -17,6 +17,7 @@
 
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
 
 /**
  * A LargeMessageBufferInternal
@@ -55,6 +56,8 @@
     * Saves this buffer to the specified output.
     */
    void saveBuffer(final OutputStream output) throws HornetQException;
+   
+   public void addPacket(final SessionReceiveContinuationMessage packet);
 
    /**
     * Waits for the completion for the specified waiting time (in milliseconds).

Modified: trunk/src/main/org/hornetq/jms/server/JMSServerManager.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/JMSServerManager.java	2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/jms/server/JMSServerManager.java	2010-12-05 10:00:09 UTC (rev 9993)
@@ -177,6 +177,7 @@
                                 long callTimeout,
                                 boolean cacheLargeMessagesClient,
                                 int minLargeMessageSize,
+                                boolean compressLargeMessage,
                                 int consumerWindowSize,
                                 int consumerMaxRate,
                                 int confirmationWindowSize,
@@ -211,6 +212,7 @@
                                 long callTimeout,
                                 boolean cacheLargeMessagesClient,
                                 int minLargeMessageSize,
+                                boolean compressLargeMessages,
                                 int consumerWindowSize,
                                 int consumerMaxRate,
                                 int confirmationWindowSize,

Modified: trunk/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java	2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java	2010-12-05 10:00:09 UTC (rev 9993)
@@ -69,6 +69,10 @@
    int getMinLargeMessageSize();
 
    void setMinLargeMessageSize(int minLargeMessageSize);
+   
+   boolean isCompressLargeMessages();
+   
+   void setCompressLargeMessages(boolean compress);   
 
    int getConsumerWindowSize();
 

Modified: trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java	2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java	2010-12-05 10:00:09 UTC (rev 9993)
@@ -60,6 +60,8 @@
    private boolean cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
 
    private int minLargeMessageSize = HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+   
+   private boolean compressLargeMessage = HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;
 
    private int consumerWindowSize = HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE;
 
@@ -248,6 +250,16 @@
       this.minLargeMessageSize = minLargeMessageSize;
    }
 
+   public boolean isCompressLargeMessages()
+   {
+      return compressLargeMessage;
+   }
+
+   public void setCompressLargeMessages(final boolean compress)
+   {
+      this.compressLargeMessage = compress;
+   }
+
    public int getConsumerWindowSize()
    {
       return consumerWindowSize;

Modified: trunk/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java	2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java	2010-12-05 10:00:09 UTC (rev 9993)
@@ -293,6 +293,11 @@
                                                                 "min-large-message-size",
                                                                 HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
                                                                 Validators.GT_ZERO);
+      
+      boolean compressLargeMessages = XMLConfigurationUtil.getBoolean(e,
+                                                                "compress-large-messages",
+                                                                HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES);
+      
       boolean blockOnAcknowledge = XMLConfigurationUtil.getBoolean(e,
                                                                    "block-on-acknowledge",
                                                                    HornetQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE);
@@ -417,6 +422,7 @@
       cfConfig.setCallTimeout(callTimeout);
       cfConfig.setCacheLargeMessagesClient(cacheLargeMessagesClient);
       cfConfig.setMinLargeMessageSize(minLargeMessageSize);
+      cfConfig.setCompressLargeMessages(compressLargeMessages);
       cfConfig.setConsumerWindowSize(consumerWindowSize);
       cfConfig.setConsumerMaxRate(consumerMaxRate);
       cfConfig.setConfirmationWindowSize(confirmationWindowSize);

Modified: trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java	2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java	2010-12-05 10:00:09 UTC (rev 9993)
@@ -18,7 +18,6 @@
 
 import javax.naming.Context;
 import javax.naming.InitialContext;
-import javax.naming.NameNotFoundException;
 import javax.naming.NamingException;
 import javax.transaction.xa.Xid;
 
@@ -723,6 +722,7 @@
                                                     final long callTimeout,
                                                     final boolean cacheLargeMessagesClient,
                                                     final int minLargeMessageSize,
+                                                    final boolean compressLargeMessage,
                                                     final int consumerWindowSize,
                                                     final int consumerMaxRate,
                                                     final int confirmationWindowSize,
@@ -761,6 +761,7 @@
          configuration.setCallTimeout(callTimeout);
          configuration.setCacheLargeMessagesClient(cacheLargeMessagesClient);
          configuration.setMinLargeMessageSize(minLargeMessageSize);
+         configuration.setCompressLargeMessages(compressLargeMessage);
          configuration.setConsumerWindowSize(consumerWindowSize);
          configuration.setConsumerMaxRate(consumerMaxRate);
          configuration.setConfirmationWindowSize(confirmationWindowSize);
@@ -797,6 +798,7 @@
                                                     final long callTimeout,
                                                     final boolean cacheLargeMessagesClient,
                                                     final int minLargeMessageSize,
+                                                    final boolean compressLargeMessages,
                                                     final int consumerWindowSize,
                                                     final int consumerMaxRate,
                                                     final int confirmationWindowSize,
@@ -836,6 +838,7 @@
          configuration.setCallTimeout(callTimeout);
          configuration.setCacheLargeMessagesClient(cacheLargeMessagesClient);
          configuration.setMinLargeMessageSize(minLargeMessageSize);
+         configuration.setCompressLargeMessages(compressLargeMessages);
          configuration.setConsumerWindowSize(consumerWindowSize);
          configuration.setConsumerMaxRate(consumerMaxRate);
          configuration.setConfirmationWindowSize(confirmationWindowSize);

Added: trunk/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java	2010-12-05 10:00:09 UTC (rev 9993)
@@ -0,0 +1,1091 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.utils;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.GatheringByteChannel;
+import java.nio.channels.ScatteringByteChannel;
+import java.util.concurrent.Executor;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.client.impl.LargeMessageBufferInternal;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
+import org.jboss.netty.buffer.ChannelBuffer;
+
+/**
+ * A DecompressedHornetQBuffer
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class DecompressedLargeMessageBuffer implements LargeMessageBufferInternal
+{
+
+   // Constants -----------------------------------------------------
+
+   private static final String OPERATION_NOT_SUPPORTED = "Operation not supported";
+
+   private static final String READ_ONLY_ERROR_MESSAGE = "This is a read-only buffer, setOperations are not supported";
+
+   // Attributes ----------------------------------------------------
+
+   final LargeMessageBufferInternal bufferDelegate;
+   
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public DecompressedLargeMessageBuffer(final LargeMessageBufferInternal bufferDelegate)
+   {
+      this.bufferDelegate = bufferDelegate;
+   }
+
+
+   // Public --------------------------------------------------------
+
+   /**
+    * 
+    */
+   public void discardUnusedPackets()
+   {
+      bufferDelegate.discardUnusedPackets();
+   }
+
+   /**
+    * Add a buff to the List, or save it to the OutputStream if set
+    * @param packet
+    */
+   public void addPacket(final SessionReceiveContinuationMessage packet)
+   {
+      bufferDelegate.addPacket(packet);
+   }
+
+   public synchronized void cancel()
+   {
+      bufferDelegate.cancel();
+   }
+
+   public synchronized void close()
+   {
+      bufferDelegate.cancel();
+   }
+
+   public void setOutputStream(final OutputStream output) throws HornetQException
+   {
+      bufferDelegate.setOutputStream(new InflaterWriter(output));
+   }
+
+   public synchronized void saveBuffer(final OutputStream output) throws HornetQException
+   {
+      setOutputStream(output);
+      waitCompletion(0);
+   }
+
+   /**
+    * 
+    * @param timeWait Milliseconds to Wait. 0 means forever
+    * @throws Exception
+    */
+   public synchronized boolean waitCompletion(final long timeWait) throws HornetQException
+   {
+      return bufferDelegate.waitCompletion(timeWait);
+   }
+
+   // Channel Buffer Implementation ---------------------------------
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#array()
+    */
+   public byte[] array()
+   {
+      throw new IllegalAccessError("array not supported on LargeMessageBufferImpl");
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#capacity()
+    */
+   public int capacity()
+   {
+      return -1;
+   }
+   
+   DataInputStream dataInput = null;
+   
+   private DataInputStream getStream()
+   {
+      if (dataInput == null)
+      {
+         try
+         {
+            InputStream input = new HornetQBufferInputStream(bufferDelegate);
+            
+            dataInput = new DataInputStream(new InflaterReader(input));
+         }
+         catch (Exception e)
+         {
+            throw new RuntimeException (e.getMessage(), e);
+         }
+         
+      }
+      return dataInput;
+   }
+   
+   private void positioningNotSupported()
+   {
+      throw new IllegalStateException("Position not supported over compressed large messages");
+   }
+
+   public byte readByte()
+   {
+      try
+      {
+         return getStream().readByte();
+      }
+      catch (Exception e)
+      {
+         throw new RuntimeException (e.getMessage(), e);
+      }
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#getByte(int)
+    */
+   public byte getByte(final int index)
+   {
+      positioningNotSupported();
+      return 0;
+   }
+
+   private byte getByte(final long index)
+   {
+      positioningNotSupported();
+      return 0;
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#getBytes(int, org.hornetq.api.core.buffers.ChannelBuffer, int, int)
+    */
+   public void getBytes(final int index, final HornetQBuffer dst, final int dstIndex, final int length)
+   {
+      positioningNotSupported();
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#getBytes(int, org.hornetq.api.core.buffers.ChannelBuffer, int, int)
+    */
+   public void getBytes(final long index, final HornetQBuffer dst, final int dstIndex, final int length)
+   {
+      positioningNotSupported();
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#getBytes(int, byte[], int, int)
+    */
+   public void getBytes(final int index, final byte[] dst, final int dstIndex, final int length)
+   {
+      positioningNotSupported();
+   }
+
+   public void getBytes(final long index, final byte[] dst, final int dstIndex, final int length)
+   {
+      positioningNotSupported();
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#getBytes(int, java.nio.ByteBuffer)
+    */
+   public void getBytes(final int index, final ByteBuffer dst)
+   {
+      positioningNotSupported();
+   }
+
+   public void getBytes(final long index, final ByteBuffer dst)
+   {
+      positioningNotSupported();
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#getBytes(int, java.io.OutputStream, int)
+    */
+   public void getBytes(final int index, final OutputStream out, final int length) throws IOException
+   {
+      positioningNotSupported();
+   }
+
+   public void getBytes(final long index, final OutputStream out, final int length) throws IOException
+   {
+      positioningNotSupported();
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#getBytes(int, java.nio.channels.GatheringByteChannel, int)
+    */
+   public int getBytes(final int index, final GatheringByteChannel out, final int length) throws IOException
+   {
+      positioningNotSupported();
+      return 0;
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#getInt(int)
+    */
+   public int getInt(final int index)
+   {
+      positioningNotSupported();
+      return 0;
+   }
+
+   public int getInt(final long index)
+   {
+      positioningNotSupported();
+      return 0;
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#getLong(int)
+    */
+   public long getLong(final int index)
+   {
+      positioningNotSupported();
+      return 0;
+   }
+
+   public long getLong(final long index)
+   {
+      positioningNotSupported();
+      return 0;
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#getShort(int)
+    */
+   public short getShort(final int index)
+   {
+      positioningNotSupported();
+      return 0;
+   }
+
+   public short getShort(final long index)
+   {
+      return (short)(getByte(index) << 8 | getByte(index + 1) & 0xFF);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#getUnsignedMedium(int)
+    */
+   public int getUnsignedMedium(final int index)
+   {
+      positioningNotSupported();
+      return 0;
+   }
+   
+   
+
+   public int getUnsignedMedium(final long index)
+   {
+      positioningNotSupported();
+      return 0;
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#setByte(int, byte)
+    */
+   public void setByte(final int index, final byte value)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#setBytes(int, org.hornetq.api.core.buffers.ChannelBuffer, int, int)
+    */
+   public void setBytes(final int index, final HornetQBuffer src, final int srcIndex, final int length)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#setBytes(int, byte[], int, int)
+    */
+   public void setBytes(final int index, final byte[] src, final int srcIndex, final int length)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#setBytes(int, java.nio.ByteBuffer)
+    */
+   public void setBytes(final int index, final ByteBuffer src)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#setBytes(int, java.io.InputStream, int)
+    */
+   public int setBytes(final int index, final InputStream in, final int length) throws IOException
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#setBytes(int, java.nio.channels.ScatteringByteChannel, int)
+    */
+   public int setBytes(final int index, final ScatteringByteChannel in, final int length) throws IOException
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#setInt(int, int)
+    */
+   public void setInt(final int index, final int value)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#setLong(int, long)
+    */
+   public void setLong(final int index, final long value)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#setMedium(int, int)
+    */
+   public void setMedium(final int index, final int value)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#setShort(int, short)
+    */
+   public void setShort(final int index, final short value)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#toByteBuffer(int, int)
+    */
+   public ByteBuffer toByteBuffer(final int index, final int length)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#toString(int, int, java.lang.String)
+    */
+   public String toString(final int index, final int length, final String charsetName)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public int readerIndex()
+   {
+      return 0;
+   }
+
+   public void readerIndex(final int readerIndex)
+   {
+      // TODO
+   }
+
+   public int writerIndex()
+   {
+      // TODO
+      return 0;
+   }
+
+   public long getSize()
+   {
+      // TODO
+      return 0;
+   }
+
+   public void writerIndex(final int writerIndex)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public void setIndex(final int readerIndex, final int writerIndex)
+   {
+      positioningNotSupported();
+   }
+
+   public void clear()
+   {
+   }
+
+   public boolean readable()
+   {
+      return true;
+   }
+
+   public boolean writable()
+   {
+      return false;
+   }
+
+   public int readableBytes()
+   {
+      return 1;
+   }
+
+   public int writableBytes()
+   {
+      return 0;
+   }
+
+   public void markReaderIndex()
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public void resetReaderIndex()
+   {
+      // TODO: reset positioning if possible
+   }
+
+   public void markWriterIndex()
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public void resetWriterIndex()
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public void discardReadBytes()
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public short getUnsignedByte(final int index)
+   {
+      return (short)(getByte(index) & 0xFF);
+   }
+
+   public int getUnsignedShort(final int index)
+   {
+      return getShort(index) & 0xFFFF;
+   }
+
+   public int getMedium(final int index)
+   {
+      int value = getUnsignedMedium(index);
+      if ((value & 0x800000) != 0)
+      {
+         value |= 0xff000000;
+      }
+      return value;
+   }
+
+   public long getUnsignedInt(final int index)
+   {
+      return getInt(index) & 0xFFFFFFFFL;
+   }
+
+   public void getBytes(int index, final byte[] dst)
+   {
+      // TODO: optimize this by using System.arraycopy
+      for (int i = 0; i < dst.length; i++)
+      {
+         dst[i] = getByte(index++);
+      }
+   }
+
+   public void getBytes(long index, final byte[] dst)
+   {
+      // TODO: optimize this by using System.arraycopy
+      for (int i = 0; i < dst.length; i++)
+      {
+         dst[i] = getByte(index++);
+      }
+   }
+
+   public void getBytes(final int index, final HornetQBuffer dst)
+   {
+      getBytes(index, dst, dst.writableBytes());
+   }
+
+   public void getBytes(final int index, final HornetQBuffer dst, final int length)
+   {
+      if (length > dst.writableBytes())
+      {
+         throw new IndexOutOfBoundsException();
+      }
+      getBytes(index, dst, dst.writerIndex(), length);
+      dst.writerIndex(dst.writerIndex() + length);
+   }
+
+   public void setBytes(final int index, final byte[] src)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public void setBytes(final int index, final HornetQBuffer src)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public void setBytes(final int index, final HornetQBuffer src, final int length)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public void setZero(final int index, final int length)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public short readUnsignedByte()
+   {
+      try
+      {
+         return (short)getStream().readUnsignedByte();
+      }
+      catch (Exception e)
+      {
+         throw new IllegalStateException (e.getMessage(), e);
+      }
+   }
+
+   public short readShort()
+   {
+      try
+      {
+         return (short)getStream().readShort();
+      }
+      catch (Exception e)
+      {
+         throw new IllegalStateException (e.getMessage(), e);
+      }
+   }
+
+   public int readUnsignedShort()
+   {
+      try
+      {
+         return (int)getStream().readUnsignedShort();
+      }
+      catch (Exception e)
+      {
+         throw new IllegalStateException (e.getMessage(), e);
+      }
+   }
+
+   public int readMedium()
+   {
+      int value = readUnsignedMedium();
+      if ((value & 0x800000) != 0)
+      {
+         value |= 0xff000000;
+      }
+      return value;
+   }
+
+   
+   public int readUnsignedMedium()
+   {
+      return (readByte() & 0xff) << 16 | (readByte() & 0xff) << 8 | (readByte() & 0xff) << 0;
+   }
+
+   public int readInt()
+   {
+      try
+      {
+         return getStream().readInt();
+      }
+      catch (Exception e)
+      {
+         throw new IllegalStateException(e.getMessage(), e);
+      }
+   }
+
+   public int readInt(final int pos)
+   {
+      positioningNotSupported();
+      return 0;
+   }
+
+   public long readUnsignedInt()
+   {
+      return readInt() & 0xFFFFFFFFL;
+   }
+
+   public long readLong()
+   {
+      try
+      {
+         return getStream().readLong();
+      }
+      catch (Exception e)
+      {
+         throw new IllegalStateException(e.getMessage(), e);
+      }
+   }
+
+   public void readBytes(final byte[] dst, final int dstIndex, final int length)
+   {
+      try
+      {
+         getStream().read(dst, dstIndex, length);
+      }
+      catch (Exception e)
+      {
+         throw new IllegalStateException(e.getMessage(), e);
+      }
+   }
+
+   public void readBytes(final byte[] dst)
+   {
+      readBytes(dst, 0, dst.length);
+   }
+
+   public void readBytes(final HornetQBuffer dst)
+   {
+      readBytes(dst, dst.writableBytes());
+   }
+
+   public void readBytes(final HornetQBuffer dst, final int length)
+   {
+      if (length > dst.writableBytes())
+      {
+         throw new IndexOutOfBoundsException();
+      }
+      readBytes(dst, dst.writerIndex(), length);
+      dst.writerIndex(dst.writerIndex() + length);
+   }
+
+   public void readBytes(final HornetQBuffer dst, final int dstIndex, final int length)
+   {
+      byte[] destBytes = new byte[length];
+      readBytes(destBytes);
+      dst.setBytes(dstIndex, destBytes);
+   }
+
+   public void readBytes(final ByteBuffer dst)
+   {
+      byte bytesToGet[] = new byte[dst.remaining()];
+      readBytes(bytesToGet);
+      dst.put(bytesToGet);
+   }
+
+   public int readBytes(final GatheringByteChannel out, final int length) throws IOException
+   {
+      throw new IllegalStateException("Not implemented!");
+   }
+
+   public void readBytes(final OutputStream out, final int length) throws IOException
+   {
+      throw new IllegalStateException("Not implemented!");
+   }
+
+   public void skipBytes(final int length)
+   {
+    
+      try
+      {
+         for (int i = 0 ; i < length; i++)
+         {
+            getStream().read();
+         }
+      }
+      catch (Exception e)
+      {
+         throw new IllegalStateException(e.getMessage(), e);
+      }
+   }
+
+   public void writeByte(final byte value)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public void writeShort(final short value)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public void writeMedium(final int value)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public void writeInt(final int value)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public void writeLong(final long value)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public void writeBytes(final byte[] src, final int srcIndex, final int length)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public void writeBytes(final byte[] src)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public void writeBytes(final HornetQBuffer src)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public void writeBytes(final HornetQBuffer src, final int length)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public void writeBytes(final ByteBuffer src)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public int writeBytes(final InputStream in, final int length) throws IOException
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public int writeBytes(final ScatteringByteChannel in, final int length) throws IOException
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public void writeZero(final int length)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public ByteBuffer toByteBuffer()
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public ByteBuffer[] toByteBuffers()
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public ByteBuffer[] toByteBuffers(final int index, final int length)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public String toString(final String charsetName)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public Object getUnderlyingBuffer()
+   {
+      return this;
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.spi.core.remoting.HornetQBuffer#readBoolean()
+    */
+   public boolean readBoolean()
+   {
+      return readByte() != 0;
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.spi.core.remoting.HornetQBuffer#readChar()
+    */
+   public char readChar()
+   {
+      return (char)readShort();
+   }
+   
+   public char getChar(final int index)
+   {
+      return (char)getShort(index);
+   }
+
+   public double getDouble(final int index)
+   {
+      return Double.longBitsToDouble(getLong(index));
+   }
+
+   public float getFloat(final int index)
+   {
+      return Float.intBitsToFloat(getInt(index));
+   }
+
+   public HornetQBuffer readBytes(final int length)
+   {
+      byte bytesToGet[] = new byte[length];
+      readBytes(bytesToGet);
+      return HornetQBuffers.wrappedBuffer(bytesToGet);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.spi.core.remoting.HornetQBuffer#readDouble()
+    */
+   public double readDouble()
+   {
+      return Double.longBitsToDouble(readLong());
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.spi.core.remoting.HornetQBuffer#readFloat()
+    */
+   public float readFloat()
+   {
+      return Float.intBitsToFloat(readInt());
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.spi.core.remoting.HornetQBuffer#readNullableSimpleString()
+    */
+   public SimpleString readNullableSimpleString()
+   {
+      int b = readByte();
+      if (b == DataConstants.NULL)
+      {
+         return null;
+      }
+      else
+      {
+         return readSimpleString();
+      }
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.spi.core.remoting.HornetQBuffer#readNullableString()
+    */
+   public String readNullableString()
+   {
+      int b = readByte();
+      if (b == DataConstants.NULL)
+      {
+         return null;
+      }
+      else
+      {
+         return readString();
+      }
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.spi.core.remoting.HornetQBuffer#readSimpleString()
+    */
+   public SimpleString readSimpleString()
+   {
+      int len = readInt();
+      byte[] data = new byte[len];
+      readBytes(data);
+      return new SimpleString(data);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.spi.core.remoting.HornetQBuffer#readString()
+    */
+   public String readString()
+   {
+      int len = readInt();
+
+      if (len < 9)
+      {
+         char[] chars = new char[len];
+         for (int i = 0; i < len; i++)
+         {
+            chars[i] = (char)readShort();
+         }
+         return new String(chars);
+      }
+      else if (len < 0xfff)
+      {
+         return readUTF();
+      }
+      else
+      {
+         return readSimpleString().toString();
+      }
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.spi.core.remoting.HornetQBuffer#readUTF()
+    */
+   public String readUTF()
+   {
+      return UTF8Util.readUTF(this);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeBoolean(boolean)
+    */
+   public void writeBoolean(final boolean val)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeChar(char)
+    */
+   public void writeChar(final char val)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeDouble(double)
+    */
+   public void writeDouble(final double val)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeFloat(float)
+    */
+   public void writeFloat(final float val)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeNullableSimpleString(org.hornetq.util.SimpleString)
+    */
+   public void writeNullableSimpleString(final SimpleString val)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeNullableString(java.lang.String)
+    */
+   public void writeNullableString(final String val)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeSimpleString(org.hornetq.util.SimpleString)
+    */
+   public void writeSimpleString(final SimpleString val)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeString(java.lang.String)
+    */
+   public void writeString(final String val)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeUTF(java.lang.String)
+    */
+   public void writeUTF(final String utf)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.buffers.ChannelBuffer#compareTo(org.hornetq.api.core.buffers.ChannelBuffer)
+    */
+   public int compareTo(final HornetQBuffer buffer)
+   {
+      return -1;
+   }
+
+   public HornetQBuffer copy()
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public HornetQBuffer slice(final int index, final int length)
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   /**
+    * @param body
+    */
+   // Inner classes -------------------------------------------------
+
+   public ChannelBuffer channelBuffer()
+   {
+      return null;
+   }
+
+   public HornetQBuffer copy(final int index, final int length)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public HornetQBuffer duplicate()
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public HornetQBuffer readSlice(final int length)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public void setChar(final int index, final char value)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public void setDouble(final int index, final double value)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public void setFloat(final int index, final float value)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public HornetQBuffer slice()
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+   public void writeBytes(final HornetQBuffer src, final int srcIndex, final int length)
+   {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+}

Added: trunk/src/main/org/hornetq/utils/DeflaterReader.java
===================================================================
--- trunk/src/main/org/hornetq/utils/DeflaterReader.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/utils/DeflaterReader.java	2010-12-05 10:00:09 UTC (rev 9993)
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.zip.Deflater;
+
+/**
+ * A DeflaterReader
+ * The reader takes an inputstream and compress it.
+ * Not for concurrent use.
+
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ *
+ */
+public class DeflaterReader extends InputStream
+{
+   private Deflater deflater = new Deflater();
+   private boolean isFinished = false;
+   private boolean compressDone = false;
+
+   private InputStream input;
+   
+   public DeflaterReader(InputStream inData)
+   {
+      input = inData;
+   }
+
+   public int read() throws IOException
+   {
+      byte[] buffer = new byte[1];
+      int n = read(buffer, 0, 1);
+      if (n == 1)
+      {
+         return (int)buffer[0] & 0xFF;
+      }
+      if (n == -1 || n == 0)
+      {
+         return -1;
+      }
+      throw new IOException("Error reading data, invalid n: " + n);
+   }
+
+   /**
+    * Try to fill the buffer with compressed bytes. Except the last effective read,
+    * this method always returns with a full buffer of compressed data.
+    * 
+    * @param buffer the buffer to fill compressed bytes
+    * @return the number of bytes really filled, -1 indicates end.
+    * @throws IOException 
+    */
+   @Override
+   public int read(byte[] buffer, int offset, int len) throws IOException
+   {
+      if (compressDone)
+      {
+         return -1;
+      }
+      
+      //buffer for reading input stream
+      byte[] readBuffer = new byte[2 * len];
+
+      int n = 0;
+      int read = 0;
+
+      while (len > 0)
+      {
+         n = deflater.deflate(buffer, offset, len);
+         if (n == 0)
+         {
+            if (isFinished)
+            {
+               deflater.end();
+               compressDone = true;
+               break;
+            }
+            else if (deflater.needsInput())
+            {
+               // read some data from inputstream
+               int m = input.read(readBuffer);
+               
+               if (m == -1)
+               {
+                  deflater.finish();
+                  isFinished = true;
+               }
+               else
+               {
+                  deflater.setInput(readBuffer, 0, m);
+               }
+            }
+            else
+            {
+               deflater.finish();
+               isFinished = true;
+            }
+         }
+         else
+         {
+            read += n;
+            offset += n;
+            len -= n;
+         }
+      }
+      return read;
+   }
+   
+}

Added: trunk/src/main/org/hornetq/utils/HornetQBufferInputStream.java
===================================================================
--- trunk/src/main/org/hornetq/utils/HornetQBufferInputStream.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/utils/HornetQBufferInputStream.java	2010-12-05 10:00:09 UTC (rev 9993)
@@ -0,0 +1,182 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.hornetq.api.core.HornetQBuffer;
+
+/**
+ * Used to send large messages
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class HornetQBufferInputStream extends InputStream
+{
+
+   /* (non-Javadoc)
+    * @see java.io.InputStream#read()
+    */
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+   private HornetQBuffer bb;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public HornetQBufferInputStream(final HornetQBuffer paramByteBuffer)
+   {
+      bb = paramByteBuffer;
+   }
+
+   @Override
+   public int read() throws IOException
+   {
+      if (bb == null)
+      {
+         throw new IOException("read on a closed InputStream");
+      }
+
+      if (remainingBytes() == 0)
+      {
+         return -1;
+      }
+      else
+      {
+         return bb.readByte() & 0xFF;
+      }
+   }
+
+   @Override
+   public int read(final byte[] byteArray) throws IOException
+   {
+      if (bb == null)
+      {
+         throw new IOException("read on a closed InputStream");
+      }
+
+      return read(byteArray, 0, byteArray.length);
+   }
+
+   @Override
+   public int read(final byte[] byteArray, final int off, final int len) throws IOException
+   {
+      if (bb == null)
+      {
+         throw new IOException("read on a closed InputStream");
+      }
+
+      if (byteArray == null)
+      {
+         throw new NullPointerException();
+      }
+      if (off < 0 || off > byteArray.length || len < 0 || off + len > byteArray.length || off + len < 0)
+      {
+         throw new IndexOutOfBoundsException();
+      }
+      if (len == 0)
+      {
+         return 0;
+      }
+
+      int size = Math.min(remainingBytes(), len);
+      
+      if (size == 0)
+      {
+         return -1;
+      }
+
+      bb.readBytes(byteArray, off, size);
+      return size;
+   }
+
+   @Override
+   public long skip(final long len) throws IOException
+   {
+      if (bb == null)
+      {
+         throw new IOException("skip on a closed InputStream");
+      }
+
+      if (len <= 0L)
+      {
+         return 0L;
+      }
+
+      int size = Math.min(remainingBytes(), (int) len);
+
+      bb.skipBytes((int)size);
+
+      return size;
+   }
+
+   @Override
+   public int available() throws IOException
+   {
+      if (bb == null)
+      {
+         throw new IOException("available on a closed InputStream");
+      }
+
+      return remainingBytes();
+   }
+
+   @Override
+   public void close() throws IOException
+   {
+      bb = null;
+   }
+
+   @Override
+   public synchronized void mark(final int paramInt)
+   {
+   }
+
+   @Override
+   public synchronized void reset() throws IOException
+   {
+      throw new IOException("mark/reset not supported");
+   }
+
+   @Override
+   public boolean markSupported()
+   {
+      return false;
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   /**
+    * @return
+    */
+   private int remainingBytes()
+   {
+      return bb.writerIndex() - bb.readerIndex();
+   }
+
+
+   // Inner classes -------------------------------------------------
+
+}

Added: trunk/src/main/org/hornetq/utils/InflaterReader.java
===================================================================
--- trunk/src/main/org/hornetq/utils/InflaterReader.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/utils/InflaterReader.java	2010-12-05 10:00:09 UTC (rev 9993)
@@ -0,0 +1,138 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.zip.DataFormatException;
+import java.util.zip.Inflater;
+
+/**
+ * An InflaterReader
+ * It takes an compressed input stream and decompressed it as it is being read.
+ * Not for concurrent use.
+ *
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ */
+public class InflaterReader extends InputStream
+{
+   private Inflater inflater = new Inflater();
+   
+   private InputStream input;
+   
+   private byte[] readBuffer;
+   private int pointer;
+   private int length;
+   
+   public InflaterReader(InputStream input)
+   {
+      this(input, 1024);
+   }
+   
+   public InflaterReader(InputStream input, int bufferSize)
+   {
+      this.input = input;
+      this.readBuffer = new byte[bufferSize];
+      this.pointer = -1;
+   }
+   
+   public int read() throws IOException
+   {
+      if (pointer == -1)
+      {
+         try
+         {
+            length = doRead(readBuffer, 0, readBuffer.length);
+            if (length == 0)
+            {
+               return -1;
+            }
+            pointer = 0;
+         }
+         catch (DataFormatException e)
+         {
+            throw new IOException(e);
+         }
+      }
+      
+      int value = readBuffer[pointer] & 0xFF;
+      pointer++;
+      if (pointer == length)
+      {
+         pointer = -1;
+      }
+      
+      return value;
+   }
+   
+   /*
+    * feed inflater more bytes in order to get some
+    * decompressed output.
+    * returns number of bytes actually got
+    */
+   private int doRead(byte[] buf, int offset, int len) throws DataFormatException, IOException
+   {
+      int read = 0;
+      int n = 0;
+      byte[] inputBuffer = new byte[len];
+      
+      while (len > 0)
+      {
+         n = inflater.inflate(buf, offset, len);
+         if (n == 0)
+         {
+            if (inflater.finished())
+            {
+               break;
+            }
+            else if (inflater.needsInput())
+            {
+               //feeding
+               int m = input.read(inputBuffer);
+               
+               if (m == -1)
+               {
+                  //it shouldn't be here, throw exception
+                  throw new DataFormatException("Input is over while inflater still expecting data");
+               }
+               else
+               {
+                  //feed the data in
+                  inflater.setInput(inputBuffer);
+                  n = inflater.inflate(buf, offset, len);
+                  if (n > 0)
+                  {
+                     read += n;
+                     offset += n;
+                     len -= n;
+                  }
+               }
+            }
+            else
+            {
+               //it shouldn't be here, throw
+               throw new DataFormatException("Inflater is neither finished nor needing input.");
+            }
+         }
+         else
+         {
+            read += n;
+            offset += n;
+            len -= n;
+         }
+      }
+      return read;
+   }
+
+}

Added: trunk/src/main/org/hornetq/utils/InflaterWriter.java
===================================================================
--- trunk/src/main/org/hornetq/utils/InflaterWriter.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/utils/InflaterWriter.java	2010-12-05 10:00:09 UTC (rev 9993)
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.utils;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.zip.DataFormatException;
+import java.util.zip.Deflater;
+import java.util.zip.Inflater;
+
+/**
+ * A InflaterWriter
+ * 
+ * This class takes an OutputStream. Compressed bytes 
+ * can directly be written into this class. The class will
+ * decompress the bytes and write them to the output stream.
+ * 
+ * Not for concurrent use.
+ * 
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ *
+ */
+public class InflaterWriter extends OutputStream
+{
+   private Inflater inflater = new Inflater();
+   private OutputStream output;
+   
+   private byte[] writeBuffer = new byte[1024];
+   private int writePointer = 0;
+   
+   private byte[] outputBuffer = new byte[writeBuffer.length*2];
+   
+   public InflaterWriter(OutputStream output)
+   {
+      this.output = output;
+   }
+
+   /*
+    * Write a compressed byte.
+    */
+   @Override
+   public void write(int b) throws IOException
+   {
+      writeBuffer[writePointer] = (byte)(b & 0xFF);
+      writePointer++;
+      
+      if (writePointer == writeBuffer.length)
+      {
+         writePointer = 0;
+         try
+         {
+            doWrite();
+         }
+         catch (DataFormatException e)
+         {
+            throw new IOException("Error decompressing data", e);
+         }
+      }
+   }
+   
+   @Override
+   public void close() throws IOException
+   {
+      if (writePointer > 0)
+      {
+         inflater.setInput(writeBuffer, 0, writePointer);
+         try
+         {
+            int n = inflater.inflate(outputBuffer);
+            while (n > 0)
+            {
+               output.write(outputBuffer, 0, n);
+               n = inflater.inflate(outputBuffer);
+            }
+            output.close();
+         }
+         catch (DataFormatException e)
+         {
+            throw new IOException(e);
+         }
+      }
+   }
+   
+   private void doWrite() throws DataFormatException, IOException
+   {
+      inflater.setInput(writeBuffer);
+      int n = inflater.inflate(outputBuffer);
+      
+      while (n > 0)
+      {
+         output.write(outputBuffer, 0, n);
+         n = inflater.inflate(outputBuffer);
+      }
+   }
+
+}

Modified: trunk/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml
===================================================================
--- trunk/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml	2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml	2010-12-05 10:00:09 UTC (rev 9993)
@@ -20,6 +20,7 @@
       <producer-window-size>7712652</producer-window-size>
       <producer-max-rate>789</producer-max-rate>
       <min-large-message-size>12</min-large-message-size>
+      <compress-large-messages>true</compress-large-messages>
       <client-id>TestClientID</client-id>
       <dups-ok-batch-size>3456</dups-ok-batch-size>
       <transaction-batch-size>4567</transaction-batch-size>

Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java	2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java	2010-12-05 10:00:09 UTC (rev 9993)
@@ -68,6 +68,7 @@
                                                        HornetQClient.DEFAULT_CALL_TIMEOUT,
                                                        HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
                                                        HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+                                                       HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
                                                        HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
                                                        HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
                                                        HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,

Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java	2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java	2010-12-05 10:00:09 UTC (rev 9993)
@@ -14,15 +14,10 @@
 package org.hornetq.jms.tests;
 
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
 
 import javax.naming.InitialContext;
 
-import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.HornetQClient;
-import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
-import org.hornetq.jms.client.HornetQConnectionFactory;
 import org.hornetq.jms.client.HornetQJMSConnectionFactory;
 import org.hornetq.jms.client.HornetQQueueConnectionFactory;
 import org.hornetq.jms.client.HornetQTopicConnectionFactory;
@@ -81,6 +76,7 @@
                                                     HornetQClient.DEFAULT_CALL_TIMEOUT,
                                                     HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
                                                     HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+                                                    HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
                                                     HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
                                                     HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
                                                     HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
@@ -115,6 +111,7 @@
                                                     HornetQClient.DEFAULT_CALL_TIMEOUT,
                                                     HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
                                                     HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+                                                    HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
                                                     HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
                                                     HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
                                                     HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
@@ -149,6 +146,7 @@
                                                     HornetQClient.DEFAULT_CALL_TIMEOUT,
                                                     HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
                                                     HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+                                                    HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
                                                     HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
                                                     HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
                                                     HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,

Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java	2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java	2010-12-05 10:00:09 UTC (rev 9993)
@@ -303,6 +303,7 @@
                                                     HornetQClient.DEFAULT_CALL_TIMEOUT,
                                                     HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
                                                     HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+                                                    HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
                                                     prefetchSize,
                                                     HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
                                                     HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,

Copied: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java (from rev 9972, branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java)
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java	2010-12-05 10:00:09 UTC (rev 9993)
@@ -0,0 +1,310 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+
+/**
+ * A LargeMessageCompressTest
+ *
+ * Just extend the LargeMessageTest
+ * 
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ * 
+ *
+ */
+public class LargeMessageCompressTest extends LargeMessageTest
+{
+   // Constructors --------------------------------------------------
+
+   protected boolean isNetty()
+   {
+      return false;
+   }
+
+   protected ClientSessionFactory createSessionFactory() throws Exception
+   {
+      ClientSessionFactory sf = locator.createSessionFactory();
+      sf.setCompressLargeMessages(true);
+      return sf;
+   }
+
+
+   public void testLargeMessageCompression() throws Exception
+   {
+      final int messageSize = (int)(3.5 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+
+      ClientSession session = null;
+
+      try
+      {
+         server = createServer(true, isNetty());
+
+         server.start();
+
+         ClientSessionFactory sf = locator.createSessionFactory();
+         sf.setCompressLargeMessages(true);
+
+         session = sf.createSession(false, false, false);
+
+         session.createTemporaryQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS);
+
+         ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+         Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+         producer.send(clientFile);
+
+         session.commit();
+
+         session.start();
+
+         ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
+         ClientMessage msg1 = consumer.receive(1000);
+         Assert.assertNotNull(msg1);
+         
+         for (int i = 0 ; i < messageSize; i++)
+         {
+            byte b = msg1.getBodyBuffer().readByte();
+            assertEquals("position = "  + i, getSamplebyte(i), b);
+         }
+
+         msg1.acknowledge();
+         session.commit();
+
+         consumer.close();
+
+         session.close();
+
+         validateNoFilesOnLargeDir();
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+
+         try
+         {
+            session.close();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+   }
+
+   public void testLargeMessageCompression2() throws Exception
+   {
+      final int messageSize = (int)(3.5 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+
+      ClientSession session = null;
+
+      try
+      {
+         server = createServer(true, isNetty());
+
+         server.start();
+
+         ClientSessionFactory sf = locator.createSessionFactory();
+         sf.setCompressLargeMessages(true);
+
+         session = sf.createSession(false, false, false);
+
+         session.createTemporaryQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS);
+
+         ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+         Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+         producer.send(clientFile);
+
+         session.commit();
+
+         session.start();
+
+         ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
+         ClientMessage msg1 = consumer.receive(1000);
+         Assert.assertNotNull(msg1);
+         
+         String testDir = this.getTestDir();
+         File testFile = new File(testDir, "async_large_message");
+         FileOutputStream output = new FileOutputStream(testFile);
+         
+         msg1.setOutputStream(output);
+
+         msg1.waitOutputStreamCompletion(0);         
+         
+         msg1.acknowledge();
+
+         session.commit();
+
+         consumer.close();
+
+         session.close();
+
+         //verify
+         FileInputStream input = new FileInputStream(testFile);
+         for (int i = 0 ; i < messageSize; i++)
+         {
+            byte b = (byte)input.read();
+            assertEquals("position = "  + i, getSamplebyte(i), b);
+         }
+         
+         testFile.delete();
+         validateNoFilesOnLargeDir();
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+
+         try
+         {
+            session.close();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+   }
+
+   public void testLargeMessageCompression3() throws Exception
+   {
+      final int messageSize = (int)(3.5 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+
+      ClientSession session = null;
+
+      try
+      {
+         server = createServer(true, isNetty());
+
+         server.start();
+
+         ClientSessionFactory sf = locator.createSessionFactory();
+         sf.setCompressLargeMessages(true);
+
+         session = sf.createSession(false, false, false);
+
+         session.createTemporaryQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS);
+
+         ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+         Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+         producer.send(clientFile);
+
+         session.commit();
+
+         session.start();
+
+         ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
+         ClientMessage msg1 = consumer.receive(1000);
+         Assert.assertNotNull(msg1);
+         
+         String testDir = this.getTestDir();
+         File testFile = new File(testDir, "async_large_message");
+         FileOutputStream output = new FileOutputStream(testFile);
+
+         msg1.saveToOutputStream(output);
+         
+         msg1.acknowledge();
+
+         session.commit();
+
+         consumer.close();
+
+         session.close();
+
+         //verify
+         FileInputStream input = new FileInputStream(testFile);
+         for (int i = 0 ; i < messageSize; i++)
+         {
+            byte b = (byte)input.read();
+            assertEquals("position = "  + i, getSamplebyte(i), b);
+         }
+         
+         testFile.delete();
+         validateNoFilesOnLargeDir();
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+
+         try
+         {
+            session.close();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+   }
+
+
+   // below are large message tests that are not applied to compressed messages 
+
+   public void testResendSmallStreamMessage() throws Exception
+   {
+   }
+
+   public void testResendLargeStreamMessage() throws Exception
+   {
+   }
+
+   public void testResendCachedSmallStreamMessage() throws Exception
+   {
+   }
+
+   public void testResendCachedLargeStreamMessage() throws Exception
+   {
+   }
+
+   public void testSimpleRollback() throws Exception
+   {
+   }
+
+   public void testSimpleRollbackXA() throws Exception
+   {
+   }
+
+
+}

Modified: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2010-12-05 10:00:09 UTC (rev 9993)
@@ -59,7 +59,7 @@
    // Static --------------------------------------------------------
    private final Logger log = Logger.getLogger(LargeMessageTest.class);
 
-   private ServerLocator locator;
+   protected ServerLocator locator;
 
    // Constructors --------------------------------------------------
 
@@ -70,6 +70,11 @@
       return false;
    }
 
+   protected ClientSessionFactory createSessionFactory() throws Exception
+   {
+      return locator.createSessionFactory();
+   }
+   
    public void testCloseConsumer() throws Exception
    {
       final int messageSize = (int)(3.5 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
@@ -82,7 +87,7 @@
 
          server.start();
 
-         ClientSessionFactory sf = locator.createSessionFactory();
+         ClientSessionFactory sf = createSessionFactory();
 
          session = sf.createSession(false, false, false);
 
@@ -153,7 +158,7 @@
    public void doTestLargeBuffer(boolean transacted) throws Exception
    {
       final int journalsize = 100 * 1024;
-      final int messageSize = 3 * journalsize;
+      final int messageSize = 3 * journalsize + 5;
       // final int messageSize = 5 * 1024;
 
       ClientSession session = null;
@@ -170,7 +175,9 @@
 
          server.start();
 
-         ClientSessionFactory sf = locator.createSessionFactory();
+         ClientSessionFactory sf = createSessionFactory();
+         
+         sf.setCompressLargeMessages(true);
 
          session = sf.createSession(!transacted, !transacted, 0);
 
@@ -254,7 +261,7 @@
 
          server.start();
 
-         ClientSessionFactory sf = locator.createSessionFactory();
+         ClientSessionFactory sf = createSessionFactory();
 
          session = sf.createSession(false, false, false);
 
@@ -307,7 +314,7 @@
 
          server.start();
 
-         sf = locator.createSessionFactory();
+         sf = createSessionFactory();
 
          session = sf.createSession(false, false, false);
 
@@ -381,7 +388,7 @@
 
          server.start();
 
-         ClientSessionFactory sf = locator.createSessionFactory();
+         ClientSessionFactory sf = createSessionFactory();
 
          session = sf.createSession(false, false, false);
 
@@ -464,7 +471,7 @@
 
          server.start();
 
-         ClientSessionFactory sf = locator.createSessionFactory();
+         ClientSessionFactory sf = createSessionFactory();
 
          SimpleString ADDRESS_DLA = LargeMessageTest.ADDRESS.concat("-dla");
          SimpleString ADDRESS_EXPIRY = LargeMessageTest.ADDRESS.concat("-expiry");
@@ -599,7 +606,7 @@
 
          server.start();
 
-         ClientSessionFactory sf = locator.createSessionFactory();
+         ClientSessionFactory sf = createSessionFactory();
 
          SimpleString ADDRESS_DLA = LargeMessageTest.ADDRESS.concat("-dla");
          SimpleString ADDRESS_EXPIRY = LargeMessageTest.ADDRESS.concat("-expiry");
@@ -675,7 +682,7 @@
 
          server.start();
 
-         sf = locator.createSessionFactory();
+         sf = createSessionFactory();
 
          session = sf.createSession(false, false, false);
 
@@ -742,7 +749,7 @@
 
          server.getAddressSettingsRepository().addMatch("*", addressSettings);
 
-         ClientSessionFactory sf = locator.createSessionFactory();
+         ClientSessionFactory sf = createSessionFactory();
 
          session = sf.createSession(false, false, false);
 
@@ -784,7 +791,7 @@
 
          server.start();
 
-         sf = locator.createSessionFactory();
+         sf = createSessionFactory();
 
          session = sf.createSession(false, false, false);
 
@@ -849,7 +856,7 @@
 
          server.start();
 
-         ClientSessionFactory sf = locator.createSessionFactory();
+         ClientSessionFactory sf = createSessionFactory();
 
          session = sf.createSession(false, false, false);
 
@@ -887,6 +894,7 @@
          }
          catch (Throwable e)
          {
+            log.error("failed", e);
             failed = true;
          }
 
@@ -955,7 +963,7 @@
          
          locator.setCacheLargeMessagesClient(true);
 
-         ClientSessionFactory sf = locator.createSessionFactory();
+         ClientSessionFactory sf = createSessionFactory();
 
          session = sf.createSession(false, false, false);
 
@@ -1887,7 +1895,7 @@
 
          SimpleString queue[] = new SimpleString[] { new SimpleString("queue1"), new SimpleString("queue2") };
 
-         ClientSessionFactory sf = locator.createSessionFactory();
+         ClientSessionFactory sf = createSessionFactory();
 
          ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
 
@@ -1970,7 +1978,7 @@
 
          SimpleString queue[] = new SimpleString[] { new SimpleString("queue1"), new SimpleString("queue2") };
 
-         ClientSessionFactory sf = locator.createSessionFactory();
+         ClientSessionFactory sf = createSessionFactory();
 
          ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
 
@@ -1998,7 +2006,7 @@
 
             server.start();
 
-            sf = locator.createSessionFactory();
+            sf = createSessionFactory();
 
             session = sf.createSession(null, null, false, true, true, false, 0);
          }
@@ -2052,7 +2060,7 @@
 
          server.start();
 
-         ClientSessionFactory sf = locator.createSessionFactory();
+         ClientSessionFactory sf = createSessionFactory();
 
          session = sf.createSession(isXA, false, false);
 
@@ -2082,7 +2090,7 @@
             session.close();
             server.stop();
             server.start();
-            sf = locator.createSessionFactory();
+            sf = createSessionFactory();
             session = sf.createSession(isXA, false, false);
 
             session.rollback(xid);
@@ -2139,7 +2147,7 @@
 
          server.start();
 
-         ClientSessionFactory sf = locator.createSessionFactory();
+         ClientSessionFactory sf = createSessionFactory();
 
          ClientSession session = sf.createSession(isXA, false, false);
 
@@ -2276,7 +2284,7 @@
          locator.setMinLargeMessageSize(1024);
          locator.setConsumerWindowSize(1024 * 1024);
 
-         ClientSessionFactory sf = locator.createSessionFactory();
+         ClientSessionFactory sf = createSessionFactory();
 
          session = sf.createSession(null, null, false, false, false, false, 0);
 
@@ -2380,7 +2388,7 @@
          locator.setMinLargeMessageSize(1024);
          locator.setConsumerWindowSize(1024 * 1024);
 
-         ClientSessionFactory sf = locator.createSessionFactory();
+         ClientSessionFactory sf = createSessionFactory();
 
          session = sf.createSession(null, null, false, false, false, false, 0);
 
@@ -2483,7 +2491,7 @@
 
          locator.setMinLargeMessageSize(100 * 1024);
 
-         ClientSessionFactory sf = locator.createSessionFactory();
+         ClientSessionFactory sf = createSessionFactory();
 
          session = sf.createSession(null, null, false, true, true, false, 0);
 
@@ -2557,7 +2565,7 @@
 
          locator.setMinLargeMessageSize(1024);
 
-         ClientSessionFactory sf = locator.createSessionFactory();
+         ClientSessionFactory sf = createSessionFactory();
 
          session = sf.createSession(null, null, false, true, true, false, 0);
 
@@ -2633,7 +2641,7 @@
 
       server.start();
 
-      ClientSessionFactory sf = locator.createSessionFactory();
+      ClientSessionFactory sf = createSessionFactory();
 
       ClientSession session = sf.createSession(false, false);
 
@@ -2687,6 +2695,77 @@
       }
    }
 
+   public void testLargeMessageCompression() throws Exception
+   {
+      final int messageSize = (int)(3.5 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+
+      ClientSession session = null;
+
+      try
+      {
+         server = createServer(true, isNetty());
+
+         server.start();
+
+         ClientSessionFactory sf = createSessionFactory();
+         sf.setCompressLargeMessages(true);
+
+         session = sf.createSession(false, false, false);
+
+         session.createTemporaryQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS);
+
+         ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+         Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+         producer.send(clientFile);
+
+         session.commit();
+
+         session.start();
+
+         ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
+         ClientMessage msg1 = consumer.receive(1000);
+         Assert.assertNotNull(msg1);
+         
+         for (int i = 0 ; i < messageSize; i++)
+         {
+            //System.out.print(msg1.getBodyBuffer().readByte() + "  ");
+            //if (i % 100 == 0) System.out.println();
+            byte b = msg1.getBodyBuffer().readByte();
+            //System.out.println("Byte read: " + (char)b + " i " + i);
+            assertEquals("position = "  + i, getSamplebyte(i), b);
+         }
+
+         msg1.acknowledge();
+         session.commit();
+
+         consumer.close();
+
+         session.close();
+
+         validateNoFilesOnLargeDir();
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+
+         try
+         {
+            session.close();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -2731,7 +2810,7 @@
 
       try
       {
-         ClientSessionFactory sf = locator.createSessionFactory();
+         ClientSessionFactory sf = createSessionFactory();
 
          if (sendBlocking)
          {
@@ -2776,7 +2855,7 @@
             server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
             server.start();
 
-            sf = locator.createSessionFactory();
+            sf = createSessionFactory();
          }
 
          session = sf.createSession(null, null, false, true, true, false, 0);

Modified: trunk/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java	2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java	2010-12-05 10:00:09 UTC (rev 9993)
@@ -136,6 +136,7 @@
                                             callTimeout,
                                             HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
                                             HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+                                            HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
                                             HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
                                             HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
                                             HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,

Modified: trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java	2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java	2010-12-05 10:00:09 UTC (rev 9993)
@@ -214,6 +214,7 @@
                                         callTimeout,
                                         HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
                                         HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+                                        HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
                                         HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
                                         HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
                                         HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,

Modified: trunk/tests/src/org/hornetq/tests/integration/jms/client/ReSendMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/ReSendMessageTest.java	2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/ReSendMessageTest.java	2010-12-05 10:00:09 UTC (rev 9993)
@@ -307,6 +307,7 @@
                                         callTimeout,
                                         true,
                                         HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+                                        HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
                                         HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
                                         HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
                                         HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,

Modified: trunk/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java	2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java	2010-12-05 10:00:09 UTC (rev 9993)
@@ -76,6 +76,7 @@
                                         HornetQClient.DEFAULT_CALL_TIMEOUT,
                                         HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
                                         HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+                                        HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
                                         HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
                                         HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
                                         HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,

Modified: trunk/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java	2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java	2010-12-05 10:00:09 UTC (rev 9993)
@@ -242,6 +242,7 @@
                                         callTimeout,
                                         true,
                                         HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+                                        HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
                                         HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
                                         HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
                                         HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,

Modified: trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java	2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java	2010-12-05 10:00:09 UTC (rev 9993)
@@ -154,6 +154,7 @@
                                         callTimeout,
                                         HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
                                         HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+                                        HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
                                         HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
                                         HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
                                         HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,

Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/config/JMSServerConfigParserTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/config/JMSServerConfigParserTest.java	2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/config/JMSServerConfigParserTest.java	2010-12-05 10:00:09 UTC (rev 9993)
@@ -74,6 +74,7 @@
       assertEquals(7712652, cfConfig.getProducerWindowSize());
       assertEquals(789, cfConfig.getProducerMaxRate());
       assertEquals(12, cfConfig.getMinLargeMessageSize());
+      assertEquals(true, cfConfig.isCompressLargeMessages());
       assertEquals("TestClientID", cfConfig.getClientID());
       assertEquals(3456, cfConfig.getDupsOKBatchSize());
       assertEquals(4567, cfConfig.getTransactionBatchSize());

Modified: trunk/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java	2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java	2010-12-05 10:00:09 UTC (rev 9993)
@@ -15,6 +15,7 @@
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
@@ -35,12 +36,14 @@
 import org.hornetq.api.core.client.MessageHandler;
 import org.hornetq.core.client.impl.ClientConsumerInternal;
 import org.hornetq.core.client.impl.ClientMessageInternal;
+import org.hornetq.core.client.impl.ClientSessionInternal;
 import org.hornetq.core.client.impl.LargeMessageBufferImpl;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
 import org.hornetq.tests.util.RandomUtil;
 import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.HornetQBufferInputStream;
 
 /**
  * A LargeMessageBufferUnitTest
@@ -56,7 +59,7 @@
 
    // Attributes ----------------------------------------------------
 
-   static int tmpFileCounter = 0; 
+   static int tmpFileCounter = 0;
 
    // Static --------------------------------------------------------
 
@@ -67,13 +70,13 @@
    protected void setUp() throws Exception
    {
       super.setUp();
-      
+
       tmpFileCounter++;
 
       File tmp = new File(getTestDir());
       tmp.mkdirs();
    }
-   
+
    protected void tearDown() throws Exception
    {
       super.tearDown();
@@ -166,6 +169,20 @@
       }
    }
 
+   public void testReadIntegersOverStream() throws Exception
+   {
+      LargeMessageBufferImpl buffer = createBufferWithIntegers(3, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15);
+      HornetQBufferInputStream is = new HornetQBufferInputStream(buffer);
+      DataInputStream dataInput = new DataInputStream(is);
+
+      for (int i = 1; i <= 15; i++)
+      {
+         Assert.assertEquals(i, dataInput.readInt());
+      }
+
+      assertEquals(-1, dataInput.read());
+   }
+
    // testing void getBytes(int index, ChannelBuffer dst, int dstIndex, int length)
    public void testReadLongs() throws Exception
    {
@@ -186,6 +203,20 @@
       }
    }
 
+   public void testReadLongsOverStream() throws Exception
+   {
+      LargeMessageBufferImpl buffer = createBufferWithLongs(3, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15);
+      HornetQBufferInputStream is = new HornetQBufferInputStream(buffer);
+      DataInputStream dataInput = new DataInputStream(is);
+
+      for (int i = 1; i <= 15; i++)
+      {
+         Assert.assertEquals(i, dataInput.readLong());
+      }
+
+      assertEquals(-1, dataInput.read());
+   }
+
    public void testReadData() throws Exception
    {
       HornetQBuffer dynamic = HornetQBuffers.dynamicBuffer(1);
@@ -315,14 +346,14 @@
          Assert.assertEquals(i, bytes[i]);
       }
    }
-   
+
    public void testSplitBufferOnFile() throws Exception
    {
       LargeMessageBufferImpl outBuffer = new LargeMessageBufferImpl(new FakeConsumerInternal(),
-                                                                          1024 * 1024,
-                                                                          1,
-                                                                          getTestFile(),
-                                                                          1024);
+                                                                    1024 * 1024,
+                                                                    1,
+                                                                    getTestFile(),
+                                                                    1024);
       try
       {
 
@@ -525,6 +556,36 @@
 
    }
 
+   public void testReadBytesOnStreaming() throws Exception
+   {
+      byte[] byteArray = new byte[1024];
+      for (int i = 0; i < byteArray.length; i++)
+      {
+         byteArray[i] = getSamplebyte(i);
+      }
+
+      HornetQBuffer splitbuffer = splitBuffer(3, byteArray);
+
+      HornetQBufferInputStream is = new HornetQBufferInputStream(splitbuffer);
+
+      for (int i = 0; i < 100; i++)
+      {
+         assertEquals(getSamplebyte(i), (byte)is.read());
+      }
+
+      for (int i = 100; i < byteArray.length; i += 10)
+      {
+         byte readBytes[] = new byte[10];
+         
+         int size = is.read(readBytes);
+         
+         for (int j = 0; j < size; j++)
+         {
+            assertEquals(getSamplebyte(i + j), readBytes[j]);
+         }
+      }
+   }
+
    /**
     * @return
     */
@@ -795,6 +856,15 @@
          return null;
       }
 
+      /* (non-Javadoc)
+       * @see org.hornetq.core.client.impl.ClientConsumerInternal#getSession()
+       */
+      public ClientSessionInternal getSession()
+      {
+         // TODO Auto-generated method stub
+         return null;
+      }
+
    }
 
 }

Copied: trunk/tests/src/org/hornetq/tests/unit/util/CompressionUtilTest.java (from rev 9972, branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/unit/util/CompressionUtilTest.java)
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/util/CompressionUtilTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/unit/util/CompressionUtilTest.java	2010-12-05 10:00:09 UTC (rev 9993)
@@ -0,0 +1,185 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.unit.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.zip.Deflater;
+
+import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.DeflaterReader;
+import org.hornetq.utils.InflaterReader;
+import org.hornetq.utils.InflaterWriter;
+
+/**
+ * A CompressionUtilTest
+ *
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ *
+ */
+public class CompressionUtilTest extends UnitTestCase
+{
+   
+   public void testDeflaterReader() throws Exception
+   {
+      String inputString = "blahblahblah??blahblahblahblahblah??blablahblah??blablahblah??bla";
+      byte[] input = inputString.getBytes("UTF-8");
+
+      ByteArrayInputStream inputStream = new ByteArrayInputStream(input);
+      
+      DeflaterReader reader = new DeflaterReader(inputStream);
+
+      ArrayList<Integer> zipHolder = new ArrayList<Integer>();
+      int b = reader.read();
+      
+      while (b != -1)
+      {
+         zipHolder.add(b);
+         b = reader.read();
+      }
+      
+      byte[] allCompressed = new byte[zipHolder.size()];
+      for (int i = 0; i < allCompressed.length; i++)
+      {
+         allCompressed[i] = (byte) zipHolder.get(i).intValue();
+      }
+      
+      byte[] output = new byte[30];
+      Deflater compresser = new Deflater();
+      compresser.setInput(input);
+      compresser.finish();
+      int compressedDataLength = compresser.deflate(output);
+      
+      compareByteArray(allCompressed, output, compressedDataLength);
+   }
+   
+   public void testDeflaterReader2() throws Exception
+   {
+      String inputString = "blahblahblah??blahblahblahblahblah??blablahblah??blablahblah??bla";
+      byte[] input = inputString.getBytes("UTF-8");
+
+      ByteArrayInputStream inputStream = new ByteArrayInputStream(input);
+      
+      DeflaterReader reader = new DeflaterReader(inputStream);
+
+      byte[] buffer = new byte[7];
+      ArrayList<Integer> zipHolder = new ArrayList<Integer>();
+      
+      int n = reader.read(buffer);
+      while (n != -1)
+      {
+         for (int i = 0; i < n; i++)
+         {
+            zipHolder.add((int)buffer[i]);
+         }
+         n = reader.read(buffer);
+      }
+      
+      byte[] allCompressed = new byte[zipHolder.size()];
+      for (int i = 0; i < allCompressed.length; i++)
+      {
+         allCompressed[i] = (byte) zipHolder.get(i).intValue();
+      }
+      
+      byte[] output = new byte[30];
+      Deflater compresser = new Deflater();
+      compresser.setInput(input);
+      compresser.finish();
+      int compressedDataLength = compresser.deflate(output);
+      
+      compareByteArray(allCompressed, output, compressedDataLength);
+   }
+   
+   public void testInflaterReader() throws Exception
+   {
+      String inputString = "blahblahblah??blahblahblahblahblah??blablahblah??blablahblah??bla";
+      byte[] input = inputString.getBytes("UTF-8");
+      byte[] output = new byte[30];
+      Deflater compresser = new Deflater();
+      compresser.setInput(input);
+      compresser.finish();
+      int compressedDataLength = compresser.deflate(output);
+
+      byte[] zipBytes = new byte[compressedDataLength];
+      
+      System.arraycopy(output, 0, zipBytes, 0, compressedDataLength);
+      ByteArrayInputStream byteInput = new ByteArrayInputStream(zipBytes);
+      
+      InflaterReader inflater = new InflaterReader(byteInput);
+      ArrayList<Integer> holder = new ArrayList<Integer>();
+      int read = inflater.read();
+      
+      while (read != -1)
+      {
+         holder.add(read);
+         read = inflater.read();
+      }
+      
+      byte[] result = new byte[holder.size()];
+      
+      for (int i = 0; i < result.length; i++)
+      {
+         result[i] = holder.get(i).byteValue();
+      }
+      
+      String txt = new String(result);
+      
+      assertEquals(inputString, txt);
+
+   }
+   
+   public void testInflaterWriter() throws Exception
+   {
+      String inputString = "blahblahblah??blahblahblahblahblah??blablahblah??blablahblah??bla";
+      byte[] input = inputString.getBytes("UTF-8");
+      byte[] output = new byte[30];
+      Deflater compresser = new Deflater();
+      compresser.setInput(input);
+      compresser.finish();
+      int compressedDataLength = compresser.deflate(output);
+
+      byte[] zipBytes = new byte[compressedDataLength];
+      
+      System.arraycopy(output, 0, zipBytes, 0, compressedDataLength);
+      ByteArrayInputStream byteInput = new ByteArrayInputStream(zipBytes);
+      
+      ByteArrayOutputStream byteOutput = new ByteArrayOutputStream();
+      InflaterWriter writer = new InflaterWriter(byteOutput);
+      
+      byte[] zipBuffer = new byte[12];
+      
+      int n = byteInput.read(zipBuffer);
+      while (n > 0)
+      {
+         writer.write(zipBuffer, 0, n);
+         n = byteInput.read(zipBuffer);
+      }
+
+      writer.close();
+      
+      byte[] outcome = byteOutput.toByteArray();
+      String outStr = new String(outcome);
+      
+      assertEquals(inputString, outStr);
+   }
+   
+   private void compareByteArray(byte[] first, byte[] second, int length)
+   {
+      for (int i = 0; i < length; i++)
+      {
+         assertEquals(first[i], second[i]);
+      }
+   }
+}

Copied: trunk/tests/src/org/hornetq/tests/unit/util/HornetQBufferInputStreamTest.java (from rev 9972, branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/unit/util/HornetQBufferInputStreamTest.java)
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/util/HornetQBufferInputStreamTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/unit/util/HornetQBufferInputStreamTest.java	2010-12-05 10:00:09 UTC (rev 9993)
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.unit.util;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.HornetQBufferInputStream;
+
+/**
+ * A HornetQInputStreamTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class HornetQBufferInputStreamTest extends UnitTestCase
+{
+
+   
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testReadBytes() throws Exception
+   {
+      byte bytes[] = new byte[10*1024];
+      for (int i = 0 ; i < bytes.length; i++)
+      {
+         bytes[i] = getSamplebyte(i);
+      }
+      
+      HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+      HornetQBufferInputStream is = new HornetQBufferInputStream(buffer);
+      
+      // First read byte per byte
+      for (int i = 0 ; i < 1024; i++)
+      {
+         assertEquals(getSamplebyte(i), is.read());
+      }
+      
+      // Second, read in chunks
+      for (int i = 1; i < 10; i++)
+      {
+         bytes = new byte[1024];
+         is.read(bytes);
+         for (int j = 0 ; j < bytes.length; j++)
+         {
+            assertEquals(getSamplebyte(i * 1024 + j), bytes[j]);
+         }
+         
+      }
+      
+      assertEquals(-1, is.read());
+      
+      
+      bytes = new byte[1024];
+      
+      int sizeRead = is.read(bytes);
+      
+      assertEquals(-1, sizeRead);
+      
+   }
+   
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java	2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java	2010-12-05 10:00:09 UTC (rev 9993)
@@ -190,6 +190,7 @@
                                         callTimeout,
                                         HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
                                         HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+                                        HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
                                         HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
                                         HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
                                         HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,

Modified: trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java	2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java	2010-12-05 10:00:09 UTC (rev 9993)
@@ -49,8 +49,6 @@
 import org.hornetq.spi.core.security.HornetQSecurityManager;
 import org.hornetq.spi.core.security.HornetQSecurityManagerImpl;
 
-import static org.hornetq.tests.util.ServiceTestBase.*;
-
 /**
  * 
  * Base class with basic utilities on starting up a basic server



More information about the hornetq-commits mailing list