[jboss-cvs] JBoss Messaging SVN: r5185 - in branches/Branch_Chunk_CRS2: src/main/org/jboss/messaging/core/client/impl and 4 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Oct 27 17:12:24 EDT 2008


Author: clebert.suconic at jboss.com
Date: 2008-10-27 17:12:24 -0400 (Mon, 27 Oct 2008)
New Revision: 5185

Added:
   branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/soak/chunk/
   branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/soak/chunk/MessageChunkSoakTest.java
Modified:
   branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/ClientMessage.java
   branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
   branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
   branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java
   branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
   branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
   branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
Log:
Fixing some issues with flow control and huge files

Modified: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/ClientMessage.java
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/ClientMessage.java	2008-10-27 16:57:12 UTC (rev 5184)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/ClientMessage.java	2008-10-27 21:12:24 UTC (rev 5185)
@@ -44,4 +44,8 @@
    void onReceipt(ClientSessionInternal session, long consumerID);
    
    void acknowledge() throws MessagingException;
+   
+   void setLargeMessage(boolean largeMessage);
+   
+   boolean isLargeMessage();
 }

Modified: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-10-27 16:57:12 UTC (rev 5184)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-10-27 21:12:24 UTC (rev 5185)
@@ -13,17 +13,22 @@
 package org.jboss.messaging.core.client.impl;
 
 import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.util.LinkedList;
 import java.util.Queue;
 import java.util.concurrent.Executor;
 
 import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.FileClientMessage;
 import org.jboss.messaging.core.client.MessageHandler;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendChunkMessage;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.util.Future;
 
@@ -67,6 +72,8 @@
    
    private File largeMessagesDir;
 
+   private ClientMessage currentChunkMessage;
+
    
    private volatile Thread receiverThread;
 
@@ -157,7 +164,11 @@
 
                boolean expired = m.isExpired();
 
-               flowControl(m.getEncodeSize());
+               // Chunk messages will execute the flow control while receiving the chunks
+               if (!m.isLargeMessage())
+               {
+                  flowControl(m.getEncodeSize());
+               }
 
                if (expired)
                {
@@ -200,12 +211,14 @@
          FileClientMessageImpl message = new FileClientMessageImpl();
          message.decodeProperties(propertiesBuffer);
          message.setFile(new File(this.largeMessagesDir, message.getMessageID() + "-" + this.session.getName() + "-" + this.getID() + ".jbm"));
+         message.setLargeMessage(true);
          return message;
       }
       else
       {
          ClientMessageImpl message = new ClientMessageImpl();
          message.decodeProperties(propertiesBuffer);
+         message.setLargeMessage(true);
          return message;
       }
    }
@@ -312,7 +325,12 @@
 
             boolean expired = message.isExpired();
 
-            flowControl(message.getEncodeSize());
+            // message chunk will call flowControl in a different method
+            // Message chunks will have already informed the flowControl
+            if (!message.isLargeMessage())
+            {
+               flowControl(message.getEncodeSize());
+            }
 
             if (!expired)
             {
@@ -340,7 +358,76 @@
          notify();
       }
    }
+   
+   
+   public void handleChunk(SessionSendChunkMessage chunk) throws Exception
+   {
+      if (closed)
+      {
+         return;
+      }
+      
+      flowControl(chunk.getPacketSize());
+      
 
+      if (chunk.getHeader() != null)
+      {
+
+         // The Header only comes on the first message, so a buffer has to be created on the client
+         // to hold either a file or a big message
+         MessagingBuffer header = new ByteBufferWrapper(ByteBuffer.wrap(chunk.getHeader()));
+
+         currentChunkMessage = createFileMessage(header);
+
+         if (currentChunkMessage instanceof FileClientMessage)
+         {
+            FileClientMessage fileMessage = (FileClientMessage)currentChunkMessage;
+            addBytesBody(fileMessage, chunk.getBody());
+         }
+         else
+         {
+            MessagingBuffer initialBody = new ByteBufferWrapper(ByteBuffer.wrap(chunk.getBody()));
+            currentChunkMessage.setBody(initialBody);
+         }
+      }
+      else
+      {
+         // No header.. this is then a continuation of a previous message
+         ByteBuffer body = ByteBuffer.wrap(chunk.getBody());
+
+         if (currentChunkMessage instanceof FileClientMessage)
+         {
+            FileClientMessage fileMessage = (FileClientMessage)currentChunkMessage;
+            addBytesBody(fileMessage, chunk.getBody());
+         }
+         else
+         {
+            MessagingBuffer currentBody = currentChunkMessage.getBody();
+
+            MessagingBuffer newBody = new ByteBufferWrapper(ByteBuffer.allocate(currentBody.limit() + body.limit()));
+
+            newBody.putBytes(currentBody.array());
+            newBody.putBytes(body.array());
+
+            currentChunkMessage.setBody(newBody);
+         }
+      }
+
+      if (!chunk.isContinues())
+      {
+         // Close the file that was being generated
+         if (currentChunkMessage instanceof FileClientMessage)
+         {
+            ((FileClientMessage)currentChunkMessage).closeChannel();
+         }
+         ClientMessage msgToSend = currentChunkMessage;
+         currentChunkMessage = null;
+         handleMessage(msgToSend);
+      }
+      
+   }
+   
+
    public void clear()
    {
       synchronized (this)
@@ -517,6 +604,13 @@
       }
    }
 
+
+   private void addBytesBody(FileClientMessage fileMessage, byte[] body) throws Exception
+   {
+      FileChannel channel = fileMessage.getChannel();
+      channel.write(ByteBuffer.wrap(body));
+   }
+
    // Inner classes
    // --------------------------------------------------------------------------------
 

Modified: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java	2008-10-27 16:57:12 UTC (rev 5184)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java	2008-10-27 21:12:24 UTC (rev 5185)
@@ -24,6 +24,7 @@
 
 import org.jboss.messaging.core.client.ClientConsumer;
 import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendChunkMessage;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 
 /**
@@ -38,6 +39,8 @@
    long getID();
 
    void handleMessage(ClientMessage message) throws Exception;
+   
+   void handleChunk(SessionSendChunkMessage chunk) throws Exception;
 
    void clear();
 

Modified: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java	2008-10-27 16:57:12 UTC (rev 5184)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java	2008-10-27 21:12:24 UTC (rev 5185)
@@ -42,6 +42,8 @@
    private long consumerID;
    
    private ClientSessionInternal session;
+   
+   private boolean largeMessage;
            
    /*
     * Constructor for when reading from network
@@ -100,4 +102,22 @@
          session.acknowledge(consumerID, messageID);
       }
    }
+
+   /**
+    * @return the largeMessage
+    */
+   public boolean isLargeMessage()
+   {
+      return largeMessage;
+   }
+
+   /**
+    * @param largeMessage the largeMessage to set
+    */
+   public void setLargeMessage(boolean largeMessage)
+   {
+      this.largeMessage = largeMessage;
+   }
+   
+   
 }

Modified: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2008-10-27 16:57:12 UTC (rev 5184)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2008-10-27 21:12:24 UTC (rev 5185)
@@ -332,7 +332,12 @@
 
       if (msg.getEncodeSize() > bigMessageSize)
       {
-         sendMessageInChunks(sendBlocking, msg, scheduledDeliveryTime);
+         // TODO: We need to send message-chunks blocked until producer flow control is done.
+         //       When sending really big messages, you could overuse the communication channel up to the point
+         //       you get out of memory, or you don't let the pings to arrive properly on the server
+         //       We will need to live with blocked sends until we enable producer flow control
+         sendMessageInChunks(true, msg, scheduledDeliveryTime);
+         //sendMessageInChunks(sendBlocking, msg, scheduledDeliveryTime);
       }
       else
       {

Modified: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-10-27 16:57:12 UTC (rev 5184)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-10-27 21:12:24 UTC (rev 5185)
@@ -678,71 +678,15 @@
 
    public void handleReceiveChunk(final long consumerID, final SessionSendChunkMessage chunk) throws Exception
    {
-      ClientMessage currentChunkMessage;
+      ClientConsumerInternal consumer = consumers.get(consumerID);
 
-      if (chunk.getHeader() != null)
+      if (consumer != null)
       {
-
-         // The Header only comes on the first message, so a buffer has to be created on the client
-         // to hold either a file or a big message
-         MessagingBuffer header = new ByteBufferWrapper(ByteBuffer.wrap(chunk.getHeader()));
-
-         currentChunkMessage = createLargeMessage(consumerID, header);
-
-         if (currentChunkMessage instanceof FileClientMessage)
-         {
-            FileClientMessage fileMessage = (FileClientMessage)currentChunkMessage;
-            addBytesBody(fileMessage, chunk.getBody());
-         }
-         else
-         {
-            MessagingBuffer initialBody = new ByteBufferWrapper(ByteBuffer.wrap(chunk.getBody()));
-            currentChunkMessage.setBody(initialBody);
-         }
-
-         currentChunk.put(consumerID, currentChunkMessage);
+         consumer.handleChunk(chunk);
       }
-      else
-      {
-         // No header.. this is then a continuation of a previous message
-         ByteBuffer body = ByteBuffer.wrap(chunk.getBody());
-
-         currentChunkMessage = currentChunk.get(consumerID);
-
-         if (currentChunkMessage instanceof FileClientMessage)
-         {
-            FileClientMessage fileMessage = (FileClientMessage)currentChunkMessage;
-            addBytesBody(fileMessage, chunk.getBody());
-         }
-         else
-         {
-            MessagingBuffer currentBody = currentChunkMessage.getBody();
-
-            MessagingBuffer newBody = new ByteBufferWrapper(ByteBuffer.allocate(currentBody.limit() + body.limit()));
-
-            newBody.putBytes(currentBody.array());
-            newBody.putBytes(body.array());
-
-            currentChunkMessage.setBody(newBody);
-         }
-      }
-
-      if (!chunk.isContinues())
-      {
-         if (currentChunkMessage instanceof FileClientMessage)
-         {
-            ((FileClientMessage)currentChunkMessage).closeChannel();
-         }
-         handleReceiveMessage(consumerID, currentChunkMessage);
-      }
+      
    }
 
-   private void addBytesBody(FileClientMessage fileMessage, byte[] body) throws Exception
-   {
-      FileChannel channel = fileMessage.getChannel();
-      channel.write(ByteBuffer.wrap(body));
-   }
-
    public void receiveProducerCredits(final long producerID, final int credits) throws Exception
    {
       ClientProducerInternal producer = producers.get(producerID);
@@ -786,18 +730,6 @@
       doCleanup();
    }
 
-   public ClientMessage createLargeMessage(final long consumerID, final MessagingBuffer header) throws Exception
-   {
-      ClientConsumerInternal consumer = consumers.get(consumerID);
-
-      if (consumer == null)
-      {
-         throw new IllegalStateException("No Consumer with ID = " + consumerID);
-      }
-
-      return consumer.createFileMessage(header);
-   }
-
    // Needs to be synchronized to prevent issues with occurring concurrently with close()
    public synchronized boolean handleFailover(final RemotingConnection backupConnection)
    {

Modified: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java	2008-10-27 16:57:12 UTC (rev 5184)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java	2008-10-27 21:12:24 UTC (rev 5185)
@@ -56,6 +56,4 @@
 
    boolean handleFailover(final RemotingConnection backupConnection);
 
-   ClientMessage createLargeMessage(long consumerID, MessagingBuffer header) throws Exception;
-
 }

Modified: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-10-27 16:57:12 UTC (rev 5184)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-10-27 21:12:24 UTC (rev 5185)
@@ -26,7 +26,7 @@
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -36,15 +36,15 @@
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.remoting.Channel;
-import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
 import org.jboss.messaging.core.remoting.DelayedResult;
 import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
 import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionReplicateDeliveryMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendChunkMessage;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionReplicateDeliveryMessage;
 import org.jboss.messaging.core.server.HandleStatus;
 import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.Queue;
@@ -91,7 +91,7 @@
 
    private final Lock lock = new ReentrantLock();
 
-   private final AtomicInteger availableCredits;
+   private final Semaphore availableCredits;
 
    private boolean started;
 
@@ -140,7 +140,7 @@
 
       if (enableFlowControl)
       {
-         availableCredits = new AtomicInteger(0);
+         availableCredits = new Semaphore(0);
       }
       else
       {
@@ -170,7 +170,7 @@
 
    public HandleStatus handle(final MessageReference ref) throws Exception
    {
-      if (availableCredits != null && availableCredits.get() <= 0)
+      if (availableCredits != null && availableCredits.availablePermits() <= 0)
       {
          return HandleStatus.BUSY;
       }
@@ -203,11 +203,6 @@
             return HandleStatus.NO_MATCH;
          }
 
-         if (availableCredits != null)
-         {
-            availableCredits.addAndGet(-message.getEncodeSize());
-         }
-
          final SessionReceiveMessage packet = new SessionReceiveMessage(id, message, ref.getDeliveryCount() + 1);
 
          if (!browseOnly)
@@ -232,7 +227,14 @@
                {
                   public void run()
                   {
-                     sendChunks((ServerLargeMessage)message);
+                     try
+                     {
+                        sendChunks((ServerLargeMessage)message);
+                     }
+                     catch (Exception e)
+                     {
+                        log.error(e);
+                     }
                   }
                });
                
@@ -241,6 +243,11 @@
          else
          {
    
+            if (availableCredits != null)
+            {
+               availableCredits.release(message.getEncodeSize());
+            }
+
             if (result == null)
             {
                // Not replicated - just send now
@@ -362,7 +369,11 @@
    {
       if (availableCredits != null)
       {
-         int previous = availableCredits.getAndAdd(credits);
+         int previous;
+         
+         // previous will be negative only at the first call, hence it is not necessary to make the next call atomic
+         previous = availableCredits.availablePermits(); 
+         availableCredits.release(credits);
 
          if (previous <= 0 && previous + credits > 0)
          {
@@ -454,7 +465,7 @@
     * @param message
     * @throws MessagingException
     */
-   private void sendChunks(ServerLargeMessage message)
+   private void sendChunks(ServerLargeMessage message) throws Exception
    {
       int headerSize = message.getPropertiesEncodeSize();
 
@@ -473,6 +484,12 @@
                                                                   bodyBuffer.array(),
                                                                   bodyLength < bodySize,
                                                                   false);
+      
+      if (availableCredits != null)
+      {
+         availableCredits.acquire(chunk.getPacketSize());
+      }
+      
       channel.send(chunk);
 
       for (int pos = bodyLength; pos < bodySize;)
@@ -484,6 +501,11 @@
 
          chunk = new SessionSendChunkMessage(id, null, bodyBuffer.array(), pos + bodyLength < bodySize, false);
 
+         if (availableCredits != null)
+         {
+            availableCredits.acquire(chunk.getPacketSize());
+         }
+
          channel.send(chunk);
 
          pos += bodyLength;

Modified: branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
===================================================================
--- branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java	2008-10-27 16:57:12 UTC (rev 5184)
+++ branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java	2008-10-27 21:12:24 UTC (rev 5185)
@@ -49,8 +49,6 @@
 import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
 import org.jboss.messaging.core.remoting.impl.RemotingServiceImpl;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.core.server.ServerMessage;
-import org.jboss.messaging.core.server.impl.ServerMessageImpl;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.tests.integration.chunkmessage.mock.MockConnector;
 import org.jboss.messaging.tests.integration.chunkmessage.mock.MockConnectorFactory;
@@ -83,6 +81,16 @@
 
    // Public --------------------------------------------------------
 
+   // Validate the functions to create and verify files 
+   public void testFiles() throws Exception
+   {
+      clearData();
+      
+      File file = createLargeFile("test.tst", 13333);
+      
+      checkFileRead(file, 13333);
+   }
+   
    public void testFailureOnSendingFile() throws Exception
    {
       clearData();
@@ -192,7 +200,7 @@
 
    public void testMessageChunkNullPersistence() throws Exception
    {
-      testInternal(false, false, 100, 5000, false, 0);
+      testInternal(false, false, 1, 5000, false, 0);
    }
 
    public void testMessageChunkNullPersistenceDelayed() throws Exception
@@ -207,13 +215,12 @@
 
    public void testMessageChunkFilePersistence100M() throws Exception
    {
-      testInternal(true, true, 1, 268435456, true, 0);
-      //testInternal(true, true, 1, 26214400, false, 0);
+      testInternal(true, true, 1, 26214400, false, 0);
    }
 
    public void testMessageChunkFilePersistenceDelayed() throws Exception
    {
-      testInternal(true, false, 1000, 50000, false, 1000);
+      testInternal(true, false, 1, 50000, false, 1000);
    }
 
    public void testSendfileMessage() throws Exception
@@ -336,7 +343,12 @@
 
    }
 
-   private void testPageOnLargeMessage(final boolean realFiles, final boolean sendBlocking) throws Exception
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   protected void testPageOnLargeMessage(final boolean realFiles, final boolean sendBlocking) throws Exception
    {
 
       clearData();
@@ -482,7 +494,7 @@
       ByteBuffer ioBuffer = ByteBuffer.allocate(DataConstants.SIZE_INT * numberOfIntegers);
       MessagingBuffer body = new ByteBufferWrapper(ioBuffer);
 
-      for (int i = 1; i <= numberOfIntegers; i++)
+      for (int i = 0; i < numberOfIntegers; i++)
       {
          body.putInt(i);
       }
@@ -492,7 +504,7 @@
       
    }
 
-   public void testInternal(final boolean realFiles,
+   protected void testInternal(final boolean realFiles,
                             final boolean useFile,
                             final int numberOfMessages,
                             final int numberOfIntegers,
@@ -591,7 +603,7 @@
 
          for (int i = 0; i < numberOfMessages; i++)
          {
-            ClientMessage message = consumer.receive(1000 + delayDelivery);
+            ClientMessage message = consumer.receive(60000 + delayDelivery);
             
             assertNotNull(message);
             
@@ -644,10 +656,6 @@
       }
    }
 
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
    @Override
    protected void setUp() throws Exception
    {
@@ -658,9 +666,7 @@
    {
    }
 
-   // Private -------------------------------------------------------
-
-   private FileClientMessage createLargeClientMessage(final ClientSession session, final int numberOfIntegers) throws Exception
+   protected FileClientMessage createLargeClientMessage(final ClientSession session, final int numberOfIntegers) throws Exception
    {
 
       FileClientMessage clientMessage = session.createFileMessage(true);
@@ -679,7 +685,7 @@
     * @throws FileNotFoundException
     * @throws IOException
     */
-   private File createLargeFile(String name, final int numberOfIntegers) throws FileNotFoundException, IOException
+   protected File createLargeFile(String name, final int numberOfIntegers) throws FileNotFoundException, IOException
    {
       File tmpFile = new File(temporaryDir + "/" + name);
       
@@ -694,16 +700,16 @@
       {
          if (buffer.position() > 0 && i % 1000 == 0)
          {
-            buffer.rewind();
+            buffer.flip();
             channel.write(buffer);
-            buffer.rewind();
+            buffer.clear();
          }
          buffer.putInt(i);
       }
       
       if (buffer.position() > 0)
       {
-         buffer.rewind();
+         buffer.flip();
          channel.write(buffer);
       }
 
@@ -723,7 +729,7 @@
     * @throws FileNotFoundException
     * @throws IOException
     */
-   private void readMessage(final ClientSession session, final SimpleString queueToRead, final int numberOfIntegers) throws MessagingException,
+   protected void readMessage(final ClientSession session, final SimpleString queueToRead, final int numberOfIntegers) throws MessagingException,
                                                                                                                     FileNotFoundException,
                                                                                                                     IOException
    {
@@ -757,28 +763,34 @@
     * @throws FileNotFoundException
     * @throws IOException
     */
-   private void checkFileRead(final File receivedFile, final int numberOfIntegers) throws FileNotFoundException,
+   protected void checkFileRead(final File receivedFile, final int numberOfIntegers) throws FileNotFoundException,
                                                                                   IOException
    {
       RandomAccessFile random2 = new RandomAccessFile(receivedFile, "r");
       FileChannel channel2 = random2.getChannel();
 
-      ByteBuffer buffer2 = ByteBuffer.allocate(4);
+      ByteBuffer buffer2 = ByteBuffer.allocate(1000 * 4);
 
       channel2.position(0l);
-
-      for (int i = 0; i < numberOfIntegers; i++)
+      
+      for (int i = 0; i < numberOfIntegers;)
       {
-         buffer2.rewind();
          channel2.read(buffer2);
-         buffer2.rewind();
+         
+         buffer2.flip();
+         for (int j = 0; j < buffer2.limit() / 4; j++, i++)
+         {
+            assertEquals(i, buffer2.getInt());
+         }
 
-         assertEquals(i, buffer2.getInt());
+         buffer2.clear();
       }
 
       channel2.close();
    }
 
+   // Private -------------------------------------------------------
+
    // Inner classes -------------------------------------------------
 
 }

Added: branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/soak/chunk/MessageChunkSoakTest.java
===================================================================
--- branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/soak/chunk/MessageChunkSoakTest.java	                        (rev 0)
+++ branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/soak/chunk/MessageChunkSoakTest.java	2008-10-27 21:12:24 UTC (rev 5185)
@@ -0,0 +1,68 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.soak.chunk;
+
+import org.jboss.messaging.tests.integration.chunkmessage.MessageChunkTest;
+
+/**
+ * A MessageChunkSoakTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * 
+ * Created Oct 27, 2008 3:44:37 PM
+ *
+ *
+ */
+public class MessageChunkSoakTest extends MessageChunkTest
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+   public void testMessageChunkFilePersistence1G() throws Exception
+   {
+      testInternal(true, true, 2, 268435456, false, 0);
+   }
+
+   public void testMessageChunkFilePersistence100M() throws Exception
+   {
+      testInternal(true, true, 10, 26214400, false, 0);
+   }
+
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}




More information about the jboss-cvs-commits mailing list