[jboss-cvs] JBoss Messaging SVN: r7628 - in trunk: src/main/org/jboss/messaging/core/client/impl and 12 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Jul 27 23:21:24 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-07-27 23:21:22 -0400 (Mon, 27 Jul 2009)
New Revision: 7628

Modified:
   trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java
   trunk/src/main/org/jboss/messaging/core/client/LargeMessageBuffer.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManager.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/LargeMessageBufferImpl.java
   trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java
   trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java
   trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java
   trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java
   trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControl.java
   trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControlImpl.java
   trunk/src/main/org/jboss/messaging/jms/server/management/jmx/impl/ReplicationAwareJMSServerControlWrapper.java
   trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/CTSMiscellaneousTest.java
   trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTestCase.java
   trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java
   trunk/tests/src/org/jboss/messaging/tests/integration/jms/bridge/BridgeTestBase.java
   trunk/tests/src/org/jboss/messaging/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/jms/server/management/JMSServerControlTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/LargeMessageBufferTest.java
   trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
   trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1601 - Implementing Client cache for LargeMessages

Modified: trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java	2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java	2009-07-28 03:21:22 UTC (rev 7628)
@@ -62,6 +62,10 @@
    long getClientFailureCheckPeriod();
 
    void setClientFailureCheckPeriod(long clientFailureCheckPeriod);
+   
+   boolean isCacheLargeMessagesClient();
+   
+   void setCacheLargeMessagesClient(boolean cached);
 
    long getConnectionTTL();
 

Modified: trunk/src/main/org/jboss/messaging/core/client/LargeMessageBuffer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/LargeMessageBuffer.java	2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/src/main/org/jboss/messaging/core/client/LargeMessageBuffer.java	2009-07-28 03:21:22 UTC (rev 7628)
@@ -42,6 +42,8 @@
 
    void close();
 
+   void cancel();
+
    void setOutputStream(final OutputStream output) throws MessagingException;
 
    void saveBuffer(final OutputStream output) throws MessagingException;

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2009-07-28 03:21:22 UTC (rev 7628)
@@ -29,6 +29,7 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
 import org.jboss.messaging.utils.Future;
 import org.jboss.messaging.utils.TokenBucketLimiter;
+import org.jboss.messaging.utils.UUIDGenerator;
 
 /**
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -70,13 +71,12 @@
 
    private final Runner runner = new Runner();
 
-   private final File directory;
-
    private ClientMessageInternal currentChunkMessage;
-   
+
    private LargeMessageBufferImpl currentLargeMessageBuffer;
-   
-   // When receiving LargeMessages, the user may choose to not read the body, on this case we need to discard te body before moving to the next message.
+
+   // When receiving LargeMessages, the user may choose to not read the body, on this case we need to discard the body
+   // before moving to the next message.
    private ClientMessageInternal largeMessageReceived;
 
    private final TokenBucketLimiter rateLimiter;
@@ -92,7 +92,7 @@
    private volatile boolean closed;
 
    private volatile int creditsToSend;
-   
+
    private volatile boolean slowConsumerInitialCreditSent = false;
 
    private volatile Exception lastException;
@@ -115,7 +115,6 @@
                              final TokenBucketLimiter rateLimiter,
                              final Executor executor,
                              final Channel channel,
-                             final File directory,
                              final boolean preAcknowledge)
    {
       this.id = id;
@@ -132,18 +131,16 @@
 
       this.ackBatchSize = ackBatchSize;
 
-      this.directory = directory;
-
       this.preAcknowledge = preAcknowledge;
    }
 
    // ClientConsumer implementation
    // -----------------------------------------------------------------
 
-   public ClientMessage receive(long timeout) throws MessagingException
+    public ClientMessage receive(long timeout) throws MessagingException
    {
       checkClosed();
-      
+
       if (largeMessageReceived != null)
       {
          // Check if there are pending packets to be received
@@ -161,7 +158,7 @@
          throw new MessagingException(MessagingException.ILLEGAL_STATE,
                                       "Cannot call receive(...) - a MessageHandler is set");
       }
-      
+
       if (clientWindowSize == 0)
       {
          startSlowConsumer();
@@ -218,20 +215,20 @@
 
             if (m != null)
             {
-               //if we have already pre acked we cant expire
+               // if we have already pre acked we cant expire
                boolean expired = !preAcknowledge && m.isExpired();
                flowControlBeforeConsumption(m);
 
                if (expired)
                {
                   m.discardLargeBody();
-                  
+
                   session.expire(id, m.getMessageID());
 
                   if (clientWindowSize == 0)
                   {
                      startSlowConsumer();
-                  }                 
+                  }
 
                   if (toWait > 0)
                   {
@@ -247,7 +244,7 @@
                {
                   this.largeMessageReceived = m;
                }
-               
+
                return m;
             }
             else
@@ -335,14 +332,6 @@
       return closed;
    }
 
-   /* (non-Javadoc)
-    * @see org.jboss.messaging.core.client.ClientConsumer#isLargeMessagesAsFiles()
-    */
-   public boolean isFileConsumer()
-   {
-      return directory != null;
-   }
-
    public void stop() throws MessagingException
    {
       waitForOnMessageToComplete();
@@ -389,7 +378,7 @@
       ClientMessageInternal messageToHandle = message;
 
       messageToHandle.onReceipt(this);
-      
+
       if (trace)
       {
          log.trace("Adding message " + message + " into buffer");
@@ -419,18 +408,26 @@
          // This is ok - we just ignore the message
          return;
       }
-      
+
       // Flow control for the first packet, we will have others
       flowControl(packet.getPacketSize(), false);
 
       currentChunkMessage = new ClientMessageImpl();
-      
+
       currentChunkMessage.decodeProperties(ChannelBuffers.wrappedBuffer(packet.getLargeMessageHeader()));
-      
+
       currentChunkMessage.setLargeMessage(true);
 
-      currentLargeMessageBuffer = new LargeMessageBufferImpl(this, packet.getLargeMessageSize(), 60);
+      File largeMessageCache = null;
+      
+      if (session.isCacheLargeMessageClient())
+      {
+         largeMessageCache = File.createTempFile("tmp-large-message-" + currentChunkMessage.getMessageID()+ "-", ".tmp");
+         largeMessageCache.deleteOnExit();
+      }
 
+      currentLargeMessageBuffer = new LargeMessageBufferImpl(this, packet.getLargeMessageSize(), 60, largeMessageCache);
+
       currentChunkMessage.setBody(currentLargeMessageBuffer);
 
       currentChunkMessage.setFlowControlSize(0);
@@ -444,7 +441,7 @@
       {
          return;
       }
-      
+
       currentLargeMessageBuffer.addPacket(chunk);
    }
 
@@ -502,17 +499,17 @@
       if (clientWindowSize >= 0)
       {
          creditsToSend += messageBytes;
-         
+
          if (creditsToSend >= clientWindowSize)
          {
-            
+
             if (clientWindowSize == 0 && discountSlowConsumer)
             {
                if (trace)
                {
                   log.trace("Sending " + creditsToSend + " -1, for slow consumer");
                }
-               
+
                slowConsumerInitialCreditSent = false;
 
                // sending the credits - 1 initially send to fire the slow consumer, or the slow consumer would be
@@ -520,7 +517,7 @@
                final int credits = creditsToSend - 1;
 
                creditsToSend = 0;
-               
+
                sendCredits(credits);
             }
             else
@@ -529,7 +526,7 @@
                {
                   log.trace("Sending " + messageBytes + " from flow-control");
                }
-             
+
                final int credits = creditsToSend;
 
                creditsToSend = 0;
@@ -669,12 +666,12 @@
                   log.trace("Calling handler.onMessage");
                }
                theHandler.onMessage(message);
-               
+
                if (trace)
                {
                   log.trace("Handler.onMessage done");
                }
-               
+
                if (message.isLargeMessage())
                {
                   message.discardLargeBody();
@@ -723,13 +720,12 @@
 
          // Now we wait for any current handler runners to run.
          waitForOnMessageToComplete();
-         
+
          if (currentLargeMessageBuffer != null)
          {
-            currentLargeMessageBuffer.close();
+            currentLargeMessageBuffer.cancel();
             currentLargeMessageBuffer = null;
          }
-         
 
          closed = true;
 

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java	2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java	2009-07-28 03:21:22 UTC (rev 7628)
@@ -58,8 +58,6 @@
    void acknowledge(ClientMessage message) throws MessagingException;
    
    void flushAcks() throws MessagingException;
-   
-   boolean isFileConsumer();
 
    void stop() throws MessagingException;
 

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java	2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java	2009-07-28 03:21:22 UTC (rev 7628)
@@ -108,6 +108,8 @@
    public static final int DEFAULT_THREAD_POOL_MAX_SIZE = -1;
 
    public static final int DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE = 5;
+   
+   public static final boolean DEFAULT_CACHE_LARGE_MESSAGE_CLIENT = false;
 
    // Attributes
    // -----------------------------------------------------------------------------------
@@ -129,6 +131,8 @@
    private boolean readOnly;
 
    // Settable attributes:
+   
+   private boolean cacheLargeMessagesClient = DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
 
    private List<Pair<TransportConfiguration, TransportConfiguration>> staticConnectors;
 
@@ -386,6 +390,17 @@
 
    // ClientSessionFactory implementation------------------------------------------------------------
 
+   public synchronized boolean isCacheLargeMessagesClient()
+   {
+      return cacheLargeMessagesClient;
+   }
+
+   public synchronized void setCacheLargeMessagesClient(boolean cached)
+   {
+      this.cacheLargeMessagesClient = cached;
+   }
+   
+   
    public synchronized List<Pair<TransportConfiguration, TransportConfiguration>> getStaticConnectors()
    {
       return staticConnectors;
@@ -966,6 +981,7 @@
                                                                  autoCommitAcks,
                                                                  preAcknowledge,
                                                                  ackBatchSize,
+                                                                 cacheLargeMessagesClient,
                                                                  minLargeMessageSize,
                                                                  blockOnAcknowledge,
                                                                  autoGroup,
@@ -1007,4 +1023,5 @@
 
       connectionManagerMap.values().toArray(connectionManagerArray);
    }
+
 }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2009-07-28 03:21:22 UTC (rev 7628)
@@ -153,6 +153,8 @@
    private final boolean blockOnPersistentSend;
 
    private final int minLargeMessageSize;
+   
+   private final boolean cacheLargeMessageClient;
 
    private final Channel channel;
 
@@ -185,6 +187,7 @@
                             final int producerMaxRate,
                             final boolean blockOnNonPersistentSend,
                             final boolean blockOnPersistentSend,
+                            final boolean cacheLargeMessageClient,
                             final int minLargeMessageSize,
                             final RemotingConnection remotingConnection,
                             final int version,
@@ -226,6 +229,8 @@
       this.blockOnNonPersistentSend = blockOnNonPersistentSend;
 
       this.blockOnPersistentSend = blockOnPersistentSend;
+      
+      this.cacheLargeMessageClient = cacheLargeMessageClient;
 
       this.minLargeMessageSize = minLargeMessageSize;
    }
@@ -353,7 +358,7 @@
                                         final int maxRate,
                                         final boolean browseOnly) throws MessagingException
    {
-      return internalCreateConsumer(queueName, filterString, windowSize, maxRate, browseOnly, null);
+      return internalCreateConsumer(queueName, filterString, windowSize, maxRate, browseOnly);
    }
 
    public ClientConsumer createConsumer(final String queueName,
@@ -568,6 +573,20 @@
    // ClientSessionInternal implementation
    // ------------------------------------------------------------
 
+   
+   public int getMinLargeMessageSize()
+   {
+      return minLargeMessageSize;
+   }
+   
+   /**
+    * @return the cacheLargeMessageClient
+    */
+   public boolean isCacheLargeMessageClient()
+   {
+      return cacheLargeMessageClient;
+   }
+
    public String getName()
    {
       return name;
@@ -1123,8 +1142,7 @@
                                                  final SimpleString filterString,
                                                  final int windowSize,
                                                  final int maxRate,
-                                                 final boolean browseOnly,
-                                                 final File directory) throws MessagingException
+                                                 final boolean browseOnly) throws MessagingException
    {
       checkClosed();
 
@@ -1174,7 +1192,6 @@
                                                                                   : null,
                                                                executor,
                                                                channel,
-                                                               directory,
                                                                preAcknowledge);
 
       addConsumer(consumer);

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java	2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java	2009-07-28 03:21:22 UTC (rev 7628)
@@ -29,7 +29,11 @@
    String getName();
 
    void acknowledge(long consumerID, long messageID) throws MessagingException;
-   
+
+   boolean isCacheLargeMessageClient();
+
+   int getMinLargeMessageSize();
+
    MessagingBuffer createBuffer(int size);
 
    void expire(long consumerID, long messageID) throws MessagingException;
@@ -51,8 +55,8 @@
    boolean handleFailover(RemotingConnection backupConnection);
 
    RemotingConnection getConnection();
-   
+
    void cleanUp() throws Exception;
-   
+
    void returnBlocking();
 }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManager.java	2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManager.java	2009-07-28 03:21:22 UTC (rev 7628)
@@ -44,6 +44,7 @@
                                final boolean autoCommitAcks,
                                final boolean preAcknowledge,
                                final int ackBatchSize,
+                               final boolean cacheLargeMessageClient,
                                final int minLargeMessageSize,
                                final boolean blockOnAcknowledge,
                                final boolean autoGroup,

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2009-07-28 03:21:22 UTC (rev 7628)
@@ -256,6 +256,7 @@
                                       final boolean autoCommitAcks,
                                       final boolean preAcknowledge,
                                       final int ackBatchSize,
+                                      final boolean cacheLargeMessageClient,
                                       final int minLargeMessageSize,
                                       final boolean blockOnAcknowledge,
                                       final boolean autoGroup,
@@ -358,6 +359,7 @@
                                                                         producerMaxRate,
                                                                         blockOnNonPersistentSend,
                                                                         blockOnPersistentSend,
+                                                                        cacheLargeMessageClient,
                                                                         minLargeMessageSize,
                                                                         connection,
                                                                         response.getServerVersion(),

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/LargeMessageBufferImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/LargeMessageBufferImpl.java	2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/LargeMessageBufferImpl.java	2009-07-28 03:21:22 UTC (rev 7628)
@@ -22,10 +22,14 @@
 
 package org.jboss.messaging.core.client.impl;
 
+import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.nio.channels.GatheringByteChannel;
 import java.nio.channels.ScatteringByteChannel;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -34,6 +38,7 @@
 import org.jboss.messaging.core.buffers.ChannelBuffer;
 import org.jboss.messaging.core.client.LargeMessageBuffer;
 import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.utils.DataConstants;
@@ -56,6 +61,8 @@
 
    // Attributes ----------------------------------------------------
 
+   private static final Logger log = Logger.getLogger(LargeMessageBufferImpl.class);
+
    private final ClientConsumerInternal consumerInternal;
 
    private final LinkedBlockingQueue<SessionReceiveContinuationMessage> packets = new LinkedBlockingQueue<SessionReceiveContinuationMessage>();
@@ -80,6 +87,8 @@
 
    private Exception handledException;
 
+   private final FileCache fileCache;
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -88,9 +97,25 @@
                                  final long totalSize,
                                  final int readTimeout)
    {
+      this(consumerInternal, totalSize, readTimeout, null);
+   }
+
+   public LargeMessageBufferImpl(final ClientConsumerInternal consumerInternal,
+                                 final long totalSize,
+                                 final int readTimeout,
+                                 final File cachedFile)
+   {
       this.consumerInternal = consumerInternal;
       this.readTimeout = readTimeout;
       this.totalSize = totalSize;
+      if (cachedFile == null)
+      {
+         this.fileCache = null;
+      }
+      else
+      {
+         this.fileCache = new FileCache(cachedFile);
+      }
    }
 
    // Public --------------------------------------------------------
@@ -125,7 +150,7 @@
    {
       int flowControlCredit = 0;
       boolean continues = false;
-      
+
       synchronized (this)
       {
          if (outStream != null)
@@ -137,6 +162,11 @@
                   streamEnded = true;
                }
 
+               if (fileCache != null)
+               {
+                  fileCache.cachePackage(packet.getBody());
+               }
+
                outStream.write(packet.getBody());
 
                flowControlCredit = packet.getPacketSize();
@@ -151,11 +181,25 @@
             }
             catch (Exception e)
             {
+               log.warn(e.getMessage(), e);
                handledException = e;
             }
          }
          else
          {
+            if (fileCache != null)
+            {
+               try
+               {
+                  fileCache.cachePackage(packet.getBody());
+               }
+               catch (Exception e)
+               {
+                  log.warn(e.getMessage(), e);
+                  handledException = e;
+               }
+            }
+
             packets.offer(packet);
          }
       }
@@ -168,18 +212,28 @@
          }
          catch (Exception e)
          {
+            log.warn(e.getMessage(), e);
             handledException = e;
          }
       }
    }
 
-   public synchronized void close()
+   public synchronized void cancel()
    {
       packets.offer(new SessionReceiveContinuationMessage());
       streamEnded = true;
+
       notifyAll();
    }
 
+   public synchronized void close()
+   {
+      if (fileCache != null)
+      {
+         fileCache.close();
+      }
+   }
+
    public void setOutputStream(final OutputStream output) throws MessagingException
    {
 
@@ -289,14 +343,23 @@
     */
    public byte getByte(final int index)
    {
-      checkForPacket(index);
-      return currentPacket.getBody()[(int)(index - packetPosition)];
+      return getByte((long)index);
    }
 
    private byte getByte(final long index)
    {
       checkForPacket(index);
-      return currentPacket.getBody()[(int)(index - packetPosition)];
+
+      //System.out.println("position = " + index + " , packetPosition = " + packetPosition + " filecache = " + fileCache);
+
+      if (fileCache != null && index < packetPosition)
+      {
+         return fileCache.getByteFromCache(index);
+      }
+      else
+      {
+         return currentPacket.getBody()[(int)(index - packetPosition)];
+      }
    }
 
    /* (non-Javadoc)
@@ -555,7 +618,15 @@
 
    public void readerIndex(final int readerIndex)
    {
-      checkForPacket(readerIndex);
+      try
+      {
+         checkForPacket(readerIndex);
+      }
+      catch (Exception e)
+      {
+         log.warn(e.getMessage(), e);
+         throw new RuntimeException(e.getMessage(), e);
+      }
       this.readerIndex = readerIndex;
    }
 
@@ -576,7 +647,15 @@
 
    public void setIndex(final int readerIndex, final int writerIndex)
    {
-      checkForPacket(readerIndex);
+      try
+      {
+         checkForPacket(readerIndex);
+      }
+      catch (Exception e)
+      {
+         log.warn(e.getMessage(), e);
+         throw new RuntimeException(e.getMessage(), e);
+      }
       this.readerIndex = readerIndex;
    }
 
@@ -620,7 +699,15 @@
 
    public void resetReaderIndex()
    {
-      checkForPacket(0);
+      try
+      {
+         checkForPacket(0);
+      }
+      catch (Exception e)
+      {
+         log.warn(e.getMessage(), e);
+         throw new RuntimeException(e.getMessage(), e);
+      }
    }
 
    public void markWriterIndex()
@@ -1186,11 +1273,15 @@
       {
          throw new IndexOutOfBoundsException();
       }
-      if (index < lastIndex)
+
+      if (fileCache == null)
       {
-         throw new IllegalAccessError("LargeMessage have read-only and one-way buffers");
+         if (index < lastIndex)
+         {
+            throw new IllegalAccessError("LargeMessage have read-only and one-way buffers");
+         }
+         lastIndex = index;
       }
-      lastIndex = index;
 
       while (index >= packetLastPosition && !streamEnded)
       {
@@ -1198,6 +1289,148 @@
       }
    }
 
+   /**
+    * @param body
+    */
    // Inner classes -------------------------------------------------
 
+   private class FileCache
+   {
+
+      private final int BUFFER_SIZE = 10 * 1024;
+
+      public FileCache(File cachedFile)
+      {
+         this.cachedFile = cachedFile;
+      }
+
+      ByteBuffer readCache;
+
+      long readCachePositionStart = Integer.MAX_VALUE;
+
+      long readCachePositionEnd = -1;
+
+      private final File cachedFile;
+
+      private volatile RandomAccessFile cachedRAFile;
+
+      private volatile FileChannel cachedChannel;
+
+      private synchronized void readCache(long position)
+      {
+
+         try
+         {
+            if (position < readCachePositionStart || position > readCachePositionEnd)
+            {
+               
+               checkOpen();
+
+               if (position > cachedChannel.size())
+               {
+                  throw new ArrayIndexOutOfBoundsException("position > " + cachedChannel.size());
+               }
+
+               readCachePositionStart = (position / BUFFER_SIZE) * BUFFER_SIZE;
+
+               if (readCache == null)
+               {
+                  readCache = ByteBuffer.allocate(BUFFER_SIZE);
+               }
+
+               readCache.clear();
+
+               readCachePositionEnd = readCachePositionStart + cachedChannel.read(readCache) -1;
+            }
+         }
+         catch (Exception e)
+         {
+            log.warn(e.getMessage(), e);
+            throw new RuntimeException(e.getMessage(), e);
+         }
+         finally
+         {
+            close();
+         }
+      }
+
+      public synchronized byte getByteFromCache(long position)
+      {
+         readCache(position);
+
+         return readCache.get((int)(position - readCachePositionStart));
+
+      }
+
+      public void cachePackage(byte[] body) throws Exception
+      {
+         checkOpen();
+
+         cachedChannel.position(cachedChannel.size());
+         cachedChannel.write(ByteBuffer.wrap(body));
+         
+         close();
+      }
+
+      /**
+      * @throws FileNotFoundException
+      */
+      public void checkOpen() throws FileNotFoundException
+      {
+         if (cachedFile != null || !cachedChannel.isOpen())
+         {
+            this.cachedRAFile = new RandomAccessFile(cachedFile, "rw");
+
+            cachedChannel = cachedRAFile.getChannel();
+         }
+      }
+
+      public void close()
+      {
+         if (cachedChannel != null && cachedChannel.isOpen())
+         {
+            try
+            {
+               cachedChannel.close();
+            }
+            catch (Exception e)
+            {
+               log.warn(e.getMessage(), e);
+            }
+            cachedChannel = null;
+         }
+
+         if (cachedRAFile != null)
+         {
+            try
+            {
+               cachedRAFile.close();
+            }
+            catch (Exception e)
+            {
+               log.warn(e.getMessage(), e);
+            }
+            cachedRAFile = null;
+         }
+
+      }
+
+      protected void finalize()
+      {
+         close();
+         if (cachedFile != null && cachedFile.exists())
+         {
+            try
+            {
+               cachedFile.delete();
+            }
+            catch (Exception e)
+            {
+               log.warn("Exception during finalization for LargeMessage file cache", e);
+            }
+         }
+      }
+
+   }
+
 }

Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java	2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java	2009-07-28 03:21:22 UTC (rev 7628)
@@ -46,9 +46,9 @@
  */
 public class JBossConnectionFactory implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory,
          XAConnectionFactory, XAQueueConnectionFactory, XATopicConnectionFactory, Serializable/*
-                                                                                                 * , Referenceable
-                                                                                                 * http://jira.jboss.org/jira/browse/JBMESSAGING-395
-                                                                                                 */
+   * , Referenceable
+   * http://jira.jboss.org/jira/browse/JBMESSAGING-395
+   */
 {
    // Constants ------------------------------------------------------------------------------------
 
@@ -67,9 +67,9 @@
    private int dupsOKBatchSize = ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE;
 
    private int transactionBatchSize = ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE;
-   
+
    private boolean readOnly;
-   
+
    // Constructors ---------------------------------------------------------------------------------
 
    public JBossConnectionFactory()
@@ -102,7 +102,7 @@
    {
       this(connectorConfig, null);
    }
-      
+
    // ConnectionFactory implementation -------------------------------------------------------------
 
    public Connection createConnection() throws JMSException
@@ -239,7 +239,7 @@
 
    public synchronized long getDiscoveryInitialWaitTimeout()
    {
-     return sessionFactory.getDiscoveryInitialWaitTimeout();
+      return sessionFactory.getDiscoveryInitialWaitTimeout();
    }
 
    public synchronized void setDiscoveryInitialWaitTimeout(long discoveryInitialWaitTimeout)
@@ -253,7 +253,7 @@
    }
 
    public synchronized void setClientID(String clientID)
-   {      
+   {
       checkWrite();
       this.clientID = clientID;
    }
@@ -350,6 +350,19 @@
       sessionFactory.setProducerMaxRate(producerMaxRate);
    }
 
+   /**
+    * @param cacheLargeMessagesClient
+    */
+   public synchronized void setCacheLargeMessagesClient(boolean cacheLargeMessagesClient)
+   {
+      sessionFactory.setCacheLargeMessagesClient(cacheLargeMessagesClient);
+   }
+
+   public synchronized boolean isCacheLargeMessagesClient()
+   {
+      return sessionFactory.isCacheLargeMessagesClient();
+   }
+
    public synchronized int getMinLargeMessageSize()
    {
       return sessionFactory.getMinLargeMessageSize();
@@ -459,7 +472,7 @@
    {
       sessionFactory.setFailoverOnServerShutdown(failoverOnServerShutdown);
    }
-   
+
    public synchronized boolean isUseGlobalPools()
    {
       return sessionFactory.isUseGlobalPools();
@@ -469,7 +482,7 @@
    {
       sessionFactory.setUseGlobalPools(useGlobalPools);
    }
-   
+
    public synchronized int getScheduledThreadPoolMaxSize()
    {
       return sessionFactory.getScheduledThreadPoolMaxSize();
@@ -489,12 +502,12 @@
    {
       sessionFactory.setThreadPoolMaxSize(threadPoolMaxSize);
    }
-   
+
    public ClientSessionFactory getCoreFactory()
    {
       return sessionFactory;
    }
-   
+
    public void close()
    {
       sessionFactory.close();
@@ -510,7 +523,7 @@
                                                                    final int type) throws JMSException
    {
       readOnly = true;
-      
+
       JBossConnection connection = new JBossConnection(username,
                                                        password,
                                                        type,
@@ -534,7 +547,6 @@
       }
    }
 
-   
    // Inner classes --------------------------------------------------------------------------------
 
 }

Modified: trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java	2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java	2009-07-28 03:21:22 UTC (rev 7628)
@@ -150,6 +150,7 @@
                                 long connectionTTL,
                                 long callTimeout,
                                 int maxConnections,
+                                boolean cacheLargeMessagesClient,
                                 int minLargeMessageSize,
                                 int consumerWindowSize,
                                 int consumerMaxRate,
@@ -181,6 +182,7 @@
                                 long connectionTTL,
                                 long callTimeout,
                                 int maxConnections,
+                                boolean cacheLargeMessagesClient,
                                 int minLargeMessageSize,
                                 int consumerWindowSize,
                                 int consumerMaxRate,

Modified: trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java	2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java	2009-07-28 03:21:22 UTC (rev 7628)
@@ -133,6 +133,7 @@
          int consumerMaxRate = getInteger(e, "consumer-max-rate", ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE, MINUS_ONE_OR_GT_ZERO);
          int producerWindowSize = getInteger(e, "producer-window-size", ClientSessionFactoryImpl.DEFAULT_PRODUCER_WINDOW_SIZE, GT_ZERO);
          int producerMaxRate = getInteger(e, "producer-max-rate", ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE, MINUS_ONE_OR_GT_ZERO);
+         boolean cacheLargeMessagesClient = getBoolean(e, "cache-large-message-client", ClientSessionFactoryImpl.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT);
          int minLargeMessageSize = getInteger(e, "min-large-message-size", ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE, GT_ZERO);
          boolean blockOnAcknowledge = getBoolean(e, "block-on-acknowledge", ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE);
          boolean blockOnNonPersistentSend = getBoolean(e, "block-on-non-persistent-send", ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND);
@@ -235,6 +236,7 @@
                                                      connectionTTL,
                                                      callTimeout,
                                                      maxConnections,
+                                                     cacheLargeMessagesClient,
                                                      minLargeMessageSize,
                                                      consumerWindowSize,
                                                      consumerMaxRate,
@@ -267,6 +269,7 @@
                                                      connectionTTL,
                                                      callTimeout,
                                                      maxConnections,
+                                                     cacheLargeMessagesClient,
                                                      minLargeMessageSize,
                                                      consumerWindowSize,
                                                      consumerMaxRate,

Modified: trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java	2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java	2009-07-28 03:21:22 UTC (rev 7628)
@@ -351,6 +351,7 @@
                                                     long connectionTTL,
                                                     long callTimeout,
                                                     int maxConnections,
+                                                    boolean cacheLargeMessagesClient,
                                                     int minLargeMessageSize,
                                                     int consumerWindowSize,
                                                     int consumerMaxRate,
@@ -383,6 +384,7 @@
          cf.setConnectionTTL(connectionTTL);
          cf.setCallTimeout(callTimeout);
          cf.setMaxConnections(maxConnections);
+         cf.setCacheLargeMessagesClient(cacheLargeMessagesClient);
          cf.setMinLargeMessageSize(minLargeMessageSize);
          cf.setConsumerWindowSize(consumerWindowSize);
          cf.setConsumerMaxRate(consumerMaxRate);
@@ -417,6 +419,7 @@
                                                     long connectionTTL,
                                                     long callTimeout,
                                                     int maxConnections,
+                                                    boolean cacheLargeMessagesClient,
                                                     int minLargeMessageSize,
                                                     int consumerWindowSize,
                                                     int consumerMaxRate,
@@ -451,6 +454,7 @@
          cf.setConnectionTTL(connectionTTL);
          cf.setCallTimeout(callTimeout);
          cf.setMaxConnections(maxConnections);
+         cf.setCacheLargeMessagesClient(cacheLargeMessagesClient);
          cf.setMinLargeMessageSize(minLargeMessageSize);
          cf.setConsumerWindowSize(consumerWindowSize);
          cf.setConsumerMaxRate(consumerMaxRate);

Modified: trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControl.java	2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControl.java	2009-07-28 03:21:22 UTC (rev 7628)
@@ -103,6 +103,7 @@
                                 long connectionTTL,
                                 long callTimeout,
                                 int maxConnections,
+                                boolean cacheLargeMessageClient,
                                 int minLargeMessageSize,
                                 int consumerWindowSize,
                                 int consumerMaxRate,
@@ -135,6 +136,7 @@
                                 @Parameter(name = "connectionTTL") long connectionTTL,
                                 @Parameter(name = "callTimeout") long callTimeout,
                                 @Parameter(name = "maxConnections") int maxConnections,
+                                @Parameter(name = "cacheLargemessageClient") boolean cacheLargeMessageClient,
                                 @Parameter(name = "minLargeMessageSize") int minLargeMessageSize,
                                 @Parameter(name = "consumerWindowSize") int consumerWindowSize,
                                 @Parameter(name = "consumerMaxRate") int consumerMaxRate,
@@ -179,6 +181,7 @@
                                 long connectionTTL,
                                 long callTimeout,
                                 int maxConnections,
+                                boolean cacheLargeMessageClient,
                                 int minLargeMessageSize,
                                 int consumerWindowSize,
                                 int consumerMaxRate,
@@ -212,6 +215,7 @@
                                 @Parameter(name = "connectionTTL") long connectionTTL,
                                 @Parameter(name = "callTimeout") long callTimeout,
                                 @Parameter(name = "maxConnections") int maxConnections,
+                                @Parameter(name = "cacheLargemessageClient") boolean cacheLargeMessageClient,
                                 @Parameter(name = "minLargeMessageSize") int minLargeMessageSize,
                                 @Parameter(name = "consumerWindowSize") int consumerWindowSize,
                                 @Parameter(name = "consumerMaxRate") int consumerMaxRate,

Modified: trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControlImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControlImpl.java	2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControlImpl.java	2009-07-28 03:21:22 UTC (rev 7628)
@@ -218,6 +218,7 @@
                                        final long connectionTTL,
                                        final long callTimeout,
                                        final int maxConnections,
+                                       final boolean cacheLargeMessageClient,
                                        final int minLargeMessageSize,
                                        final int consumerWindowSize,
                                        final int consumerMaxRate,
@@ -254,6 +255,7 @@
                                      connectionTTL,
                                      callTimeout,
                                      maxConnections,
+                                     cacheLargeMessageClient,
                                      minLargeMessageSize,
                                      consumerWindowSize,
                                      consumerMaxRate,
@@ -289,6 +291,7 @@
                                        final long connectionTTL,
                                        final long callTimeout,
                                        final int maxConnections,
+                                       final boolean cacheLargeMessageClient,
                                        final int minLargeMessageSize,
                                        final int consumerWindowSize,
                                        final int consumerMaxRate,
@@ -327,6 +330,7 @@
                               connectionTTL,
                               callTimeout,
                               maxConnections,
+                              cacheLargeMessageClient,
                               minLargeMessageSize,
                               consumerWindowSize,
                               consumerMaxRate,
@@ -383,6 +387,7 @@
                                        final long connectionTTL,
                                        final long callTimeout,
                                        final int maxConnections,
+                                       final boolean cacheLargeMessageClient,
                                        final int minLargeMessageSize,
                                        final int consumerWindowSize,
                                        final int consumerMaxRate,
@@ -417,6 +422,7 @@
                                      connectionTTL,
                                      callTimeout,
                                      maxConnections,
+                                     cacheLargeMessageClient,
                                      minLargeMessageSize,
                                      consumerWindowSize,
                                      consumerMaxRate,
@@ -452,6 +458,7 @@
                                        final long connectionTTL,
                                        final long callTimeout,
                                        final int maxConnections,
+                                       final boolean cacheLargeMessageClient,
                                        final int minLargeMessageSize,
                                        final int consumerWindowSize,
                                        final int consumerMaxRate,
@@ -486,6 +493,7 @@
                               connectionTTL,
                               callTimeout,
                               maxConnections,
+                              cacheLargeMessageClient,
                               minLargeMessageSize,
                               consumerWindowSize,
                               consumerMaxRate,

Modified: trunk/src/main/org/jboss/messaging/jms/server/management/jmx/impl/ReplicationAwareJMSServerControlWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/jmx/impl/ReplicationAwareJMSServerControlWrapper.java	2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/jmx/impl/ReplicationAwareJMSServerControlWrapper.java	2009-07-28 03:21:22 UTC (rev 7628)
@@ -76,6 +76,7 @@
                                        final long connectionTTL,
                                        final long callTimeout,
                                        final int maxConnections,
+                                       final boolean cacheLargeMessageClient,
                                        final int minLargeMessageSize,
                                        final int consumerWindowSize,
                                        final int consumerMaxRate,
@@ -108,6 +109,7 @@
                              connectionTTL,
                              callTimeout,
                              maxConnections,
+                             cacheLargeMessageClient,
                              minLargeMessageSize,
                              consumerWindowSize,
                              consumerMaxRate,
@@ -141,6 +143,7 @@
                                        final long connectionTTL,
                                        final long callTimeout,
                                        final int maxConnections,
+                                       final boolean cacheLargeMessageClient,
                                        final int minLargeMessageSize,
                                        final int consumerWindowSize,
                                        final int consumerMaxRate,
@@ -173,6 +176,7 @@
                              connectionTTL,
                              callTimeout,
                              maxConnections,
+                             cacheLargeMessageClient,
                              minLargeMessageSize,
                              consumerWindowSize,
                              consumerMaxRate,
@@ -338,6 +342,7 @@
                                        final long connectionTTL,
                                        final long callTimeout,
                                        final int maxConnections,
+                                       final boolean cacheLargeMessageClient,
                                        final int minLargeMessageSize,
                                        final int consumerWindowSize,
                                        final int consumerMaxRate,
@@ -371,6 +376,7 @@
                              connectionTTL,
                              callTimeout,
                              maxConnections,
+                             cacheLargeMessageClient,
                              minLargeMessageSize,
                              consumerWindowSize,
                              consumerMaxRate,
@@ -404,6 +410,7 @@
                                        final long connectionTTL,
                                        final long callTimeout,
                                        final int maxConnections,
+                                       final boolean cacheLargeMessageClient,
                                        final int minLargeMessageSize,
                                        final int consumerWindowSize,
                                        final int consumerMaxRate,
@@ -437,6 +444,7 @@
                              connectionTTL,
                              callTimeout,
                              maxConnections,
+                             cacheLargeMessageClient,
                              minLargeMessageSize,
                              consumerWindowSize,
                              consumerMaxRate,

Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/CTSMiscellaneousTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/CTSMiscellaneousTest.java	2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/CTSMiscellaneousTest.java	2009-07-28 03:21:22 UTC (rev 7628)
@@ -31,6 +31,7 @@
 import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
 import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN;
 import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
 import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
 import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
 import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
@@ -101,6 +102,7 @@
                                                        DEFAULT_CONNECTION_TTL,
                                                        DEFAULT_CALL_TIMEOUT,
                                                        DEFAULT_MAX_CONNECTIONS,
+                                                       DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
                                                        DEFAULT_MIN_LARGE_MESSAGE_SIZE,
                                                        DEFAULT_CONSUMER_WINDOW_SIZE,
                                                        DEFAULT_CONSUMER_MAX_RATE,

Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTestCase.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTestCase.java	2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTestCase.java	2009-07-28 03:21:22 UTC (rev 7628)
@@ -10,6 +10,7 @@
 import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
 import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN;
 import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
 import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
 import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
 import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
@@ -76,6 +77,7 @@
                                                     DEFAULT_CONNECTION_TTL,
                                                     DEFAULT_CALL_TIMEOUT,
                                                     DEFAULT_MAX_CONNECTIONS,
+                                                    DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
                                                     DEFAULT_MIN_LARGE_MESSAGE_SIZE,
                                                     DEFAULT_CONSUMER_WINDOW_SIZE,
                                                     DEFAULT_CONSUMER_MAX_RATE,

Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java	2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java	2009-07-28 03:21:22 UTC (rev 7628)
@@ -30,6 +30,7 @@
 import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
 import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN;
 import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
 import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
 import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
 import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
@@ -325,6 +326,7 @@
                                                     DEFAULT_CONNECTION_TTL,
                                                     DEFAULT_CALL_TIMEOUT,
                                                     DEFAULT_MAX_CONNECTIONS,
+                                                    DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,                                                    
                                                     DEFAULT_MIN_LARGE_MESSAGE_SIZE,
                                                     prefetchSize,
                                                     DEFAULT_CONSUMER_MAX_RATE,

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/jms/bridge/BridgeTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/bridge/BridgeTestBase.java	2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/bridge/BridgeTestBase.java	2009-07-28 03:21:22 UTC (rev 7628)
@@ -190,6 +190,7 @@
             cf.setReconnectAttempts(0);
             cf.setBlockOnNonPersistentSend(true);
             cf.setBlockOnPersistentSend(true);
+            cf.setCacheLargeMessagesClient(true);
             
             return cf;
          }
@@ -209,6 +210,7 @@
             cf.setReconnectAttempts(0);
             cf.setBlockOnNonPersistentSend(true);
             cf.setBlockOnPersistentSend(true);
+            cf.setCacheLargeMessagesClient(true);
             
             return cf;
          }

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java	2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java	2009-07-28 03:21:22 UTC (rev 7628)
@@ -47,10 +47,10 @@
 
 //  The bridge won't work with largeMessages & failures
 //   https://jira.jboss.org/jira/browse/JBMESSAGING-1601 
-//   public void testCrashAndReconnectDestBasic_OnceAndOnlyOnce_P_LargeMessage() throws Exception
-//   {
-//      testCrashAndReconnectDestBasic(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, true, true);
-//   }
+   public void testCrashAndReconnectDestBasic_OnceAndOnlyOnce_P_LargeMessage() throws Exception
+   {
+      testCrashAndReconnectDestBasic(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, true, true);
+   }
    
    public void testCrashAndReconnectDestBasic_OnceAndOnlyOnce_NP() throws Exception
    {

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/jms/server/management/JMSServerControlTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/server/management/JMSServerControlTest.java	2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/server/management/JMSServerControlTest.java	2009-07-28 03:21:22 UTC (rev 7628)
@@ -406,6 +406,7 @@
                                             ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL,
                                             ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
                                             ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
+                                            ClientSessionFactoryImpl.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
                                             ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
                                             ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE,
                                             ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE,
@@ -450,6 +451,7 @@
                                             ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL,
                                             ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
                                             ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
+                                            ClientSessionFactoryImpl.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
                                             ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
                                             ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE,
                                             ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE,

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java	2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java	2009-07-28 03:21:22 UTC (rev 7628)
@@ -96,6 +96,7 @@
                                              long connectionTTL,
                                              long callTimeout,
                                              int maxConnections,
+                                             boolean cacheLargeMessageClient,
                                              int minLargeMessageSize,
                                              int consumerWindowSize,
                                              int consumerMaxRate,
@@ -128,6 +129,7 @@
                                   connectionTTL,
                                   callTimeout,
                                   maxConnections,
+                                  cacheLargeMessageClient,
                                   minLargeMessageSize,
                                   consumerWindowSize,
                                   consumerMaxRate,
@@ -161,6 +163,7 @@
                                              long connectionTTL,
                                              long callTimeout,
                                              int maxConnections,
+                                             boolean cacheLargeMessageClient,
                                              int minLargeMessageSize,
                                              int consumerWindowSize,
                                              int consumerMaxRate,
@@ -193,6 +196,7 @@
                                   connectionTTL,
                                   callTimeout,
                                   maxConnections,
+                                  cacheLargeMessageClient,
                                   minLargeMessageSize,
                                   consumerWindowSize,
                                   consumerMaxRate,
@@ -375,6 +379,7 @@
                                              long connectionTTL,
                                              long callTimeout,
                                              int maxConnections,
+                                             boolean cacheLargeMessageClient,
                                              int minLargeMessageSize,
                                              int consumerWindowSize,
                                              int consumerMaxRate,
@@ -408,6 +413,7 @@
                                   connectionTTL,
                                   callTimeout,
                                   maxConnections,
+                                  cacheLargeMessageClient,
                                   minLargeMessageSize,
                                   consumerWindowSize,
                                   consumerMaxRate,
@@ -442,6 +448,7 @@
                                              long connectionTTL,
                                              long callTimeout,
                                              int maxConnections,
+                                             boolean cacheLargeMessageClient,
                                              int minLargeMessageSize,
                                              int consumerWindowSize,
                                              int consumerMaxRate,
@@ -475,6 +482,7 @@
                                   connectionTTL,
                                   callTimeout,
                                   maxConnections,
+                                  cacheLargeMessageClient,
                                   minLargeMessageSize,
                                   consumerWindowSize,
                                   consumerMaxRate,

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/LargeMessageBufferTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/LargeMessageBufferTest.java	2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/LargeMessageBufferTest.java	2009-07-28 03:21:22 UTC (rev 7628)
@@ -25,6 +25,7 @@
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
+import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PipedInputStream;
@@ -195,6 +196,44 @@
       assertEquals(f1, readBuffer.readFloat());
    }
 
+   private File getTestFile()
+   {
+      return new File(getTestDir(), "temp.file");
+   }
+   
+   public void testReadDataOverCached() throws Exception
+   {
+      clearData();
+      
+      ChannelBuffer dynamic = ChannelBuffers.dynamicBuffer(1);
+
+      String str1 = RandomUtil.randomString();
+      String str2 = RandomUtil.randomString();
+      Double d1 = RandomUtil.randomDouble();
+      float f1 = RandomUtil.randomFloat();
+
+      dynamic.writeUTF(str1);
+      dynamic.writeString(str2);
+      dynamic.writeDouble(d1);
+      dynamic.writeFloat(f1);
+
+      LargeMessageBufferImpl readBuffer = splitBuffer(3, dynamic.array(), getTestFile());
+
+      assertEquals(str1, readBuffer.readUTF());
+      assertEquals(str2, readBuffer.readString());
+      assertEquals(d1, readBuffer.readDouble());
+      assertEquals(f1, readBuffer.readFloat());
+      
+      readBuffer.readerIndex(0);
+      
+      assertEquals(str1, readBuffer.readUTF());
+      assertEquals(str2, readBuffer.readString());
+      assertEquals(d1, readBuffer.readDouble());
+      assertEquals(f1, readBuffer.readFloat());
+      
+      readBuffer.close();
+   }
+
    public void testReadPartialData() throws Exception
    {
 
@@ -239,7 +278,7 @@
 
       latchGo.await();
 
-      buffer.close();
+      buffer.cancel();
 
       t.join();
 
@@ -262,7 +301,9 @@
 
    public void testStreamData() throws Exception
    {
-      final LargeMessageBufferImpl outBuffer = new LargeMessageBufferImpl(new FakeConsumerInternal(), 1024 * 11 + 123, 1);
+      final LargeMessageBufferImpl outBuffer = new LargeMessageBufferImpl(new FakeConsumerInternal(),
+                                                                          1024 * 11 + 123,
+                                                                          1);
 
       final PipedOutputStream output = new PipedOutputStream();
       final PipedInputStream input = new PipedInputStream(output);
@@ -373,11 +414,11 @@
       assertEquals(0, errors.get());
 
    }
-   
+
    public void testStreamDataWaitCompletionOnCompleteBuffer() throws Exception
    {
       final LargeMessageBufferImpl outBuffer = create15BytesSample();
-      
+
       outBuffer.saveBuffer(new OutputStream()
       {
          @Override
@@ -395,7 +436,6 @@
 
       outBuffer.addPacket(new SessionReceiveContinuationMessage(-1, new byte[] { 0, 1, 2, 3, 4 }, true, false));
 
-
       final CountDownLatch latchBytesWritten1 = new CountDownLatch(5);
       final CountDownLatch latchBytesWritten2 = new CountDownLatch(10);
 
@@ -408,8 +448,7 @@
             latchBytesWritten2.countDown();
          }
       });
-      
-      
+
       latchBytesWritten1.await();
 
       try
@@ -420,8 +459,7 @@
       catch (IllegalAccessError ignored)
       {
       }
-      
-      
+
       assertTrue("It waited too much", System.currentTimeMillis() - start < 30000);
 
    }
@@ -462,8 +500,13 @@
 
    private LargeMessageBufferImpl splitBuffer(int splitFactor, byte[] bytes) throws Exception
    {
-      LargeMessageBufferImpl outBuffer = new LargeMessageBufferImpl(new FakeConsumerInternal(), bytes.length, 5);
+      return splitBuffer(splitFactor, bytes, null);
+   }
 
+   private LargeMessageBufferImpl splitBuffer(int splitFactor, byte[] bytes, File file) throws Exception
+   {
+      LargeMessageBufferImpl outBuffer = new LargeMessageBufferImpl(new FakeConsumerInternal(), bytes.length, 5, file);
+
       ByteArrayInputStream input = new ByteArrayInputStream(bytes);
 
       while (true)
@@ -700,6 +743,21 @@
 
       }
 
+      /* (non-Javadoc)
+       * @see org.jboss.messaging.core.client.ClientConsumer#getLargeMessageCacheDir()
+       */
+      public File getLargeMessageCacheDir()
+      {
+          return null;
+      }
+
+      /* (non-Javadoc)
+       * @see org.jboss.messaging.core.client.ClientConsumer#setLargeMessageCacheDir(java.io.File)
+       */
+      public void setLargeMessageCacheDir(File largeMessageCacheDir)
+      {
+      }
+
    }
 
 }

Modified: trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java	2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java	2009-07-28 03:21:22 UTC (rev 7628)
@@ -99,28 +99,6 @@
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
-
-   protected void clearData()
-   {
-      clearData(getTestDir());
-   }
-
-   protected void clearData(String testDir)
-   {
-      // Need to delete the root
-
-      File file = new File(testDir);
-      deleteDirectory(file);
-      file.mkdirs();
-
-      recreateDirectory(getJournalDir(testDir));
-      recreateDirectory(getBindingsDir(testDir));
-      recreateDirectory(getPageDir(testDir));
-      recreateDirectory(getLargeMessagesDir(testDir));
-      recreateDirectory(getClientLargeMessagesDir(testDir));
-      recreateDirectory(getTemporaryDir(testDir));
-   }
-
    protected Configuration createConfigForJournal()
    {
       Configuration config = new ConfigurationImpl();

Modified: trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java	2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java	2009-07-28 03:21:22 UTC (rev 7628)
@@ -273,6 +273,28 @@
       return testDir;
    }
 
+   protected void clearData()
+   {
+      clearData(getTestDir());
+   }
+
+   protected void clearData(String testDir)
+   {
+      // Need to delete the root
+
+      File file = new File(testDir);
+      deleteDirectory(file);
+      file.mkdirs();
+
+      recreateDirectory(getJournalDir(testDir));
+      recreateDirectory(getBindingsDir(testDir));
+      recreateDirectory(getPageDir(testDir));
+      recreateDirectory(getLargeMessagesDir(testDir));
+      recreateDirectory(getClientLargeMessagesDir(testDir));
+      recreateDirectory(getTemporaryDir(testDir));
+   }
+
+
    /**
     * @return the journalDir
     */




More information about the jboss-cvs-commits mailing list