[jboss-cvs] JBoss Messaging SVN: r5510 - in trunk: src/main/org/jboss/messaging/core/client and 7 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Dec 10 16:03:01 EST 2008


Author: clebert.suconic at jboss.com
Date: 2008-12-10 16:03:00 -0500 (Wed, 10 Dec 2008)
New Revision: 5510

Modified:
   trunk/build-messaging.xml
   trunk/src/main/org/jboss/messaging/core/client/ClientFileMessage.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
   trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java
   trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
   trunk/tests/src/org/jboss/messaging/tests/soak/chunk/MessageChunkSoakTest.java
   trunk/tests/src/org/jboss/messaging/tests/stress/chunk/MessageChunkStressTest.java
   trunk/tests/src/org/jboss/messaging/tests/timing/core/server/impl/QueueImplTest.java
Log:
timing testsuite fix & tweaks on chunk and paging

Modified: trunk/build-messaging.xml
===================================================================
--- trunk/build-messaging.xml	2008-12-10 19:05:56 UTC (rev 5509)
+++ trunk/build-messaging.xml	2008-12-10 21:03:00 UTC (rev 5510)
@@ -1013,7 +1013,7 @@
 
    <target name="all-tests" depends="unit-tests, integration-tests, concurrent-tests, stress-tests, jms-tests"/>
 
-   <target name="hudson-tests" depends="unit-tests, integration-tests, concurrent-tests, jms-tests"/>
+   <target name="hudson-tests" depends="unit-tests, integration-tests, concurrent-tests, stress-tests, timing-tests, jms-tests"/>
 
    <target name="compile-reports">
       <mkdir dir="${test.stylesheets.dir}"/>

Modified: trunk/src/main/org/jboss/messaging/core/client/ClientFileMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientFileMessage.java	2008-12-10 19:05:56 UTC (rev 5509)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientFileMessage.java	2008-12-10 21:03:00 UTC (rev 5510)
@@ -23,11 +23,7 @@
 package org.jboss.messaging.core.client;
 
 import java.io.File;
-import java.nio.channels.FileChannel;
 
-import org.jboss.messaging.core.client.impl.ClientMessageInternal;
-import org.jboss.messaging.core.exception.MessagingException;
-
 /**
  * A ClientFileMessage
  *
@@ -37,7 +33,7 @@
  *
  *
  */
-public interface ClientFileMessage extends ClientMessageInternal
+public interface ClientFileMessage extends ClientMessage
 {
    File getFile();
 

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-12-10 19:05:56 UTC (rev 5509)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-12-10 21:03:00 UTC (rev 5510)
@@ -29,6 +29,7 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.util.Future;
 
@@ -333,7 +334,7 @@
       }
    }
 
-   public synchronized void handleLargeMessage(final byte[] header) throws Exception
+   public synchronized void handleLargeMessage(final SessionReceiveMessage packet) throws Exception
    {
       if (closing)
       {
@@ -342,9 +343,9 @@
       }
 
       // Flow control for the first packet, we will have others
-      flowControl(header.length);
+      flowControl(packet.getPacketSize());
 
-      currentChunkMessage = createFileMessage(header);
+      currentChunkMessage = createFileMessage(packet.getLargeMessageHeader());
    }
 
    public synchronized void handleLargeMessageContinuation(final SessionReceiveContinuationMessage chunk) throws Exception
@@ -358,7 +359,7 @@
 
       if (chunk.isContinues())
       {
-         flowControl(chunk.getBody().length);
+         flowControl(chunk.getPacketSize());
       }
 
       if (isFileConsumer())
@@ -392,7 +393,7 @@
             ((ClientFileMessageInternal)currentChunkMessage).closeChannel();
          }
 
-         currentChunkMessage.setFlowControlSize(chunk.getBody().length);
+         currentChunkMessage.setFlowControlSize(chunk.getPacketSize());
 
          ClientMessageInternal msgToSend = currentChunkMessage;
 
@@ -606,12 +607,29 @@
          {
             channel.sendBlocking(new SessionConsumerCloseMessage(id));
          }
+         
+         clearBuffer();
       }
       finally
       {
          session.removeConsumer(this);
       }
    }
+   
+   private void clearBuffer()
+   {
+      if (isFileConsumer())
+      {
+         for (ClientMessage message: buffer)
+         {
+            if (message instanceof ClientFileMessage)
+            {
+               ((ClientFileMessage) message).getFile().delete();
+            }
+         }
+      }
+      buffer.clear();
+   }
 
    private void doAck(final ClientMessage message) throws MessagingException
    {
@@ -622,12 +640,12 @@
       session.acknowledge(id, message.getMessageID());
    }
 
-   private ClientFileMessage cloneAsFileMessage(final ClientMessageInternal message) throws Exception
+   private ClientMessageInternal cloneAsFileMessage(final ClientMessageInternal message) throws Exception
    {
       if (message instanceof ClientFileMessageImpl)
       {
          // nothing to be done
-         return (ClientFileMessage)message;
+         return message;
       }
       else
       {

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java	2008-12-10 19:05:56 UTC (rev 5509)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java	2008-12-10 21:03:00 UTC (rev 5510)
@@ -26,6 +26,7 @@
 import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
 
 /**
  * 
@@ -40,7 +41,7 @@
 
    void handleMessage(ClientMessageInternal message) throws Exception;
 
-   void handleLargeMessage(byte[] largeMessageHeader) throws Exception;
+   void handleLargeMessage(SessionReceiveMessage largeMessageHeader) throws Exception;
    
    void handleLargeMessageContinuation(SessionReceiveContinuationMessage continuation) throws Exception;
 

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2008-12-10 19:05:56 UTC (rev 5509)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2008-12-10 21:03:00 UTC (rev 5510)
@@ -25,7 +25,6 @@
 import java.nio.ByteBuffer;
 
 import org.jboss.messaging.core.client.AcknowledgementHandler;
-import org.jboss.messaging.core.client.ClientFileMessage;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.message.Message;
@@ -229,7 +228,7 @@
 
       if (msg.getEncodeSize() > minLargeMessageSize)
       {
-         sendMessageInChunks(true, msg);
+         sendMessageInChunks(sendBlocking, msg);
       }
       else if (sendBlocking)
       {
@@ -266,26 +265,18 @@
 
       for (int pos = 0; pos < bodySize;)
       {
-         final int chunkLength;
          final boolean lastChunk;
                   
-         final int bytesToWrite = bodySize - pos;
+         final int chunkLength = Math.min(bodySize - pos, minLargeMessageSize); 
          
-         if (bytesToWrite < minLargeMessageSize)
-         {
-            lastChunk = true;
-            chunkLength = bytesToWrite;
-         }
-         else
-         {
-            lastChunk = false;
-            chunkLength = minLargeMessageSize;
-         }
-         
          final MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate(chunkLength));
 
          msg.encodeBody(bodyBuffer, pos, chunkLength);
 
+         pos += chunkLength;
+         
+         lastChunk = pos >= bodySize;
+
          final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(bodyBuffer.array(), !lastChunk, lastChunk && sendBlocking);
 
          if (sendBlocking && lastChunk)
@@ -298,7 +289,6 @@
             channel.send(chunk);
          }
 
-         pos += chunkLength;
       }
 
       if (msg instanceof ClientFileMessageInternal)

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-12-10 19:05:56 UTC (rev 5509)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-12-10 21:03:00 UTC (rev 5510)
@@ -61,6 +61,7 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
@@ -639,13 +640,14 @@
       }
    }
    
-   public void handleReceiveLargeMessage(final long consumerID, final byte[] headerBytes) throws Exception
+   public void handleReceiveLargeMessage(final long consumerID, final SessionReceiveMessage receiveMessage) throws Exception
    {
       ClientConsumerInternal consumer = consumers.get(consumerID);
 
       if (consumer != null)
       {
-         consumer.handleLargeMessage(headerBytes);
+         consumer.handleLargeMessage(receiveMessage);
+         
       }
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java	2008-12-10 19:05:56 UTC (rev 5509)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java	2008-12-10 21:03:00 UTC (rev 5510)
@@ -16,6 +16,7 @@
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.remoting.RemotingConnection;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
 
 /**
  * A ClientSessionInternal
@@ -27,7 +28,7 @@
    String getName();
 
    void acknowledge(long consumerID, long messageID) throws MessagingException;
-   
+
    void expire(long consumerID, long messageID) throws MessagingException;
 
    void addConsumer(ClientConsumerInternal consumer);
@@ -39,12 +40,12 @@
    void removeProducer(ClientProducerInternal producer);
 
    void handleReceiveMessage(long consumerID, ClientMessageInternal message) throws Exception;
-      
-   void handleReceiveLargeMessage(final long consumerID, final byte[] headerBytes) throws Exception;
 
-   void handleReceiveContinuation(final long consumerID, final SessionReceiveContinuationMessage continuation) throws Exception;
+   void handleReceiveLargeMessage(long consumerID, SessionReceiveMessage headerBytes) throws Exception;
 
+   void handleReceiveContinuation(long consumerID, SessionReceiveContinuationMessage continuation) throws Exception;
+
    void handleFailover(RemotingConnection backupConnection);
-   
+
    RemotingConnection getConnection();
 }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java	2008-12-10 19:05:56 UTC (rev 5509)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java	2008-12-10 21:03:00 UTC (rev 5510)
@@ -77,7 +77,7 @@
                
                if (message.isLargeMessage())
                {
-                  clientSession.handleReceiveLargeMessage(message.getConsumerID(), message.getLargeMessageHeader());
+                  clientSession.handleReceiveLargeMessage(message.getConsumerID(), message);
                }
                else
                {

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2008-12-10 19:05:56 UTC (rev 5509)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2008-12-10 21:03:00 UTC (rev 5510)
@@ -277,6 +277,7 @@
 
    public synchronized void startGlobalDepage()
    {
+      setGlobalPageMode(true);
       for (PagingStore store : stores.values())
       {
          store.startDepaging(pagingSPI.getGlobalDepagerExecutor());

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-12-10 19:05:56 UTC (rev 5509)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-12-10 21:03:00 UTC (rev 5510)
@@ -657,7 +657,7 @@
                if (availableCredits != null)
                {
                   // RequiredBufferSize on this case represents the right number of bytes sent
-                  availableCredits.addAndGet(-headerBuffer.limit());
+                  availableCredits.addAndGet(-initialMessage.getRequiredBufferSize());
                }
             }
 
@@ -669,7 +669,7 @@
 
                if (availableCredits != null)
                {
-                  availableCredits.addAndGet(-chunkLen);
+                  availableCredits.addAndGet(-readAheadChunk.getRequiredBufferSize());
                }
 
                channel.send(readAheadChunk);
@@ -694,7 +694,7 @@
 
                if (availableCredits != null)
                {
-                  availableCredits.addAndGet(-chunkLen);
+                  availableCredits.addAndGet(-chunk.getRequiredBufferSize());
                }
 
                channel.send(chunk);

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java	2008-12-10 19:05:56 UTC (rev 5509)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java	2008-12-10 21:03:00 UTC (rev 5510)
@@ -86,31 +86,33 @@
    protected void testChunks(final boolean realFiles,
                              final boolean useFile,
                              final boolean preAck,
+                             final boolean sendingBlocking,
                              final int numberOfMessages,
                              final int numberOfIntegers,
-                             final boolean sendingBlocking,
                              final int waitOnConsumer,
                              final long delayDelivery) throws Exception
    {
       testChunks(realFiles,
                  useFile,
                  preAck,
+                 sendingBlocking,
                  numberOfMessages,
                  numberOfIntegers,
-                 sendingBlocking,
                  waitOnConsumer,
                  delayDelivery,
+                 -1,
                  false);
    }
 
    protected void testChunks(final boolean realFiles,
                              final boolean useFile,
                              final boolean preAck,
+                             final boolean sendingBlocking,
                              final int numberOfMessages,
                              final int numberOfIntegers,
-                             final boolean sendingBlocking,
                              final int waitOnConsumer,
                              final long delayDelivery,
+                             final int producerWindow,
                              final boolean testTime) throws Exception
    {
 
@@ -129,6 +131,11 @@
             sf.setBlockOnPersistentSend(true);
             sf.setBlockOnAcknowledge(true);
          }
+         
+         if (producerWindow > 0)
+         {
+            sf.setSendWindowSize(producerWindow);
+         }
 
          ClientSession session = sf.createSession(null, null, false, true, false, preAck, 0);
 

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java	2008-12-10 19:05:56 UTC (rev 5509)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java	2008-12-10 21:03:00 UTC (rev 5510)
@@ -42,6 +42,7 @@
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.message.Message;
+import org.jboss.messaging.core.message.impl.MessageImpl;
 import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
 import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
 import org.jboss.messaging.core.remoting.impl.RemotingServiceImpl;
@@ -205,35 +206,115 @@
 
       checkFileRead(file, 13333);
    }
+   
+   public void testClearOnClientBuffer() throws Exception
+   {
+      clearData();
 
+      messagingService = createService(true);
+      messagingService.start();
+
+      final int numberOfIntegers = 10;
+      final int numberOfMessages = 100;
+      
+      try
+      {
+         ClientSessionFactory sf = createInVMFactory();
+
+         sf.setBlockOnNonPersistentSend(true);
+         sf.setBlockOnPersistentSend(true);
+         sf.setBlockOnAcknowledge(true);
+
+         ClientSession session = sf.createSession(null, null, false, true, false, false, 0);
+
+         session.createQueue(ADDRESS, ADDRESS, null, true, false, true);
+         
+         messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize();
+
+         ClientProducer producer = session.createProducer(ADDRESS);
+
+         File tmpData = createLargeFile(getTemporaryDir(), "someFile.dat", numberOfIntegers);
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            ClientMessage message = session.createFileMessage(true);
+            ((ClientFileMessage)message).setFile(tmpData);
+            message.putIntProperty(new SimpleString("counter-message"), i);
+            System.currentTimeMillis();
+            producer.send(message);
+         }
+
+
+         ClientConsumer consumer = session.createFileConsumer(new File(getClientLargeMessagesDir()), ADDRESS);;
+
+         File clientfiles = new File(getClientLargeMessagesDir());
+         
+         session.start();
+         
+         ClientMessage msg = consumer.receive(1000);
+         msg.acknowledge();
+         
+         for (int i = 0; i < 100; i++)
+         {
+            if (clientfiles.listFiles().length > 0)
+            {
+               break;
+            }
+            Thread.sleep(100);
+         }
+         
+         assertTrue(clientfiles.listFiles().length > 0);
+
+         session.close();
+         
+         
+         assertEquals(1, clientfiles.list().length); // 1 message was received, that should be kept
+
+         validateNoFilesOnLargeDir();
+         
+
+      }
+      finally
+      {
+         try
+         {
+            messagingService.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+      
+   }
+
    public void testMessageChunkFilePersistence() throws Exception
    {
-      testChunks(true, false, false, 100, 262144, false, RECEIVE_WAIT_TIME, 0);
+      testChunks(true, false, false, false, 100, 262144, RECEIVE_WAIT_TIME, 0);
    }
 
    public void testMessageChunkFilePersistenceBlocked() throws Exception
    {
-      testChunks(true, false, false, 100, 262144, true, RECEIVE_WAIT_TIME, 0);
+      testChunks(true, false, false, true, 100, 262144, RECEIVE_WAIT_TIME, 0);
    }
 
    public void testMessageChunkFilePersistenceBlockedPreCommit() throws Exception
    {
-      testChunks(true, false, true, 100, 262144, true, RECEIVE_WAIT_TIME, 0);
+      testChunks(true, false, true, true, 100, 262144, RECEIVE_WAIT_TIME, 0);
    }
 
    public void testMessageChunkFilePersistenceDelayed() throws Exception
    {
-      testChunks(true, false, false, 1, 50000, false, RECEIVE_WAIT_TIME, 2000);
+      testChunks(true, false, false, false, 1, 50000, RECEIVE_WAIT_TIME, 2000);
    }
 
    public void testMessageChunkNullPersistence() throws Exception
    {
-      testChunks(false, false, false, 1, 50000, false, RECEIVE_WAIT_TIME, 0);
+      testChunks(false, false, false, false, 1, 50000, RECEIVE_WAIT_TIME, 0);
    }
 
    public void testMessageChunkNullPersistenceDelayed() throws Exception
    {
-      testChunks(false, false, false, 100, 50000, false, RECEIVE_WAIT_TIME, 100);
+      testChunks(false, false, false, false, 100, 50000, RECEIVE_WAIT_TIME, 100);
    }
 
    public void testPageOnLargeMessage() throws Exception
@@ -250,44 +331,44 @@
 
    public void testSendfileMessage() throws Exception
    {
-      testChunks(true, true, false, 100, 50000, false, RECEIVE_WAIT_TIME, 0);
+      testChunks(true, true, false, false, 100, 50000, RECEIVE_WAIT_TIME, 0);
 
    }
 
    public void testSendfileMessageOnNullPersistence() throws Exception
    {
-      testChunks(false, true, false, 100, 50000, false, RECEIVE_WAIT_TIME, 0);
+      testChunks(false, true, false, false, 100, 50000, RECEIVE_WAIT_TIME, 0);
    }
 
    public void testSendfileMessageOnNullPersistenceSmallMessage() throws Exception
    {
-      testChunks(false, true, false, 100, 100, true, RECEIVE_WAIT_TIME, 0);
+      testChunks(false, true, false, true, 100, 100, RECEIVE_WAIT_TIME, 0);
    }
 
    public void testSendfileMessageSmallMessage() throws Exception
    {
-      testChunks(true, true, false, 100, 4, false, RECEIVE_WAIT_TIME, 0);
+      testChunks(true, true, false, false, 100, 4, RECEIVE_WAIT_TIME, 0);
 
    }
 
    public void testSendRegularMessageNullPersistence() throws Exception
    {
-      testChunks(false, false, false, 100, 100, false, RECEIVE_WAIT_TIME, 0);
+      testChunks(false, false, false, false, 100, 100, RECEIVE_WAIT_TIME, 0);
    }
 
    public void testSendRegularMessageNullPersistenceDelayed() throws Exception
    {
-      testChunks(false, false, false, 100, 100, false, RECEIVE_WAIT_TIME, 1000);
+      testChunks(false, false, false, false, 100, 100, RECEIVE_WAIT_TIME, 1000);
    }
 
    public void testSendRegularMessagePersistence() throws Exception
    {
-      testChunks(true, false, false, 100, 100, false, RECEIVE_WAIT_TIME, 0);
+      testChunks(true, false, false, false, 100, 100, RECEIVE_WAIT_TIME, 0);
    }
 
    public void testSendRegularMessagePersistenceDelayed() throws Exception
    {
-      testChunks(true, false, false, 100, 100, false, RECEIVE_WAIT_TIME, 1000);
+      testChunks(true, false, false, false, 100, 100, RECEIVE_WAIT_TIME, 1000);
    }
 
    public void testTwoBindingsTwoStartedConsumers() throws Exception

Modified: trunk/tests/src/org/jboss/messaging/tests/soak/chunk/MessageChunkSoakTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/soak/chunk/MessageChunkSoakTest.java	2008-12-10 19:05:56 UTC (rev 5509)
+++ trunk/tests/src/org/jboss/messaging/tests/soak/chunk/MessageChunkSoakTest.java	2008-12-10 21:03:00 UTC (rev 5510)
@@ -48,7 +48,7 @@
 
    public void testMessageChunkFilePersistence1G() throws Exception
    {
-      testChunks(true, true, false, 2, 268435456, false, 300000, 0, true);
+      testChunks(true, true, false, true, 2, 268435456, 300000, 0, -1, true);
    }
 
    // Package protected ---------------------------------------------

Modified: trunk/tests/src/org/jboss/messaging/tests/stress/chunk/MessageChunkStressTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/stress/chunk/MessageChunkStressTest.java	2008-12-10 19:05:56 UTC (rev 5509)
+++ trunk/tests/src/org/jboss/messaging/tests/stress/chunk/MessageChunkStressTest.java	2008-12-10 21:03:00 UTC (rev 5510)
@@ -45,15 +45,15 @@
    // Constructors --------------------------------------------------
 
    // Public --------------------------------------------------------
-
-   public void testMessageChunkFilePersistence100M() throws Exception
+   
+   public void setUp() throws Exception
    {
-      testChunks(true, true, false, 10, 26214400, false, 120000, 0);
+      super.setUp();
    }
-
+   
    public void testMessageChunkFilePersistence1M() throws Exception
    {
-      testChunks(true, true, false, 1000, 262144, false, 120000, 0);
+      testChunks(true, true, false, true, 1000, 262144, 120000, 0, -1, false);
    }
 
    // Package protected ---------------------------------------------

Modified: trunk/tests/src/org/jboss/messaging/tests/timing/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/timing/core/server/impl/QueueImplTest.java	2008-12-10 19:05:56 UTC (rev 5509)
+++ trunk/tests/src/org/jboss/messaging/tests/timing/core/server/impl/QueueImplTest.java	2008-12-10 21:03:00 UTC (rev 5510)
@@ -243,96 +243,6 @@
       assertTrue(consumer.getReferences().isEmpty());
    }
 
-   public void testDeleteAllReferences() throws Exception
-   {
-      Queue queue = new QueueImpl(1, new SimpleString("queue1"), null, false, true, false, scheduledExecutor, null);
-
-      StorageManager storageManager = EasyMock.createStrictMock(StorageManager.class);
-
-      final int numMessages = 10;
-
-      List<MessageReference> refs = new ArrayList<MessageReference>();
-
-      for (int i = 0; i < numMessages; i++)
-      {
-         MessageReference ref = generateReference(queue, i);
-
-         ref.getMessage().setDurable(i % 2 == 0);
-
-         refs.add(ref);
-
-         queue.addLast(ref);
-      }
-
-      //Add some scheduled too
-
-      final long waitTime = 2000;
-
-      final int numScheduled = 10;
-
-      for (int i = numMessages; i < numMessages + numScheduled; i++)
-      {
-         MessageReference ref = generateReference(queue, i);
-
-         ref.setScheduledDeliveryTime(System.currentTimeMillis() + waitTime);
-
-         ref.getMessage().setDurable(i % 2 == 0);
-
-         refs.add(ref);
-
-         queue.addLast(ref);
-      }
-
-
-      assertEquals(numMessages + numScheduled, queue.getMessageCount());
-      assertEquals(numScheduled, queue.getScheduledCount());
-      assertEquals(0, queue.getDeliveringCount());
-
-      //What I expect to get
-
-      EasyMock.expect(storageManager.generateUniqueID()).andReturn(1L);
-
-      for (int i = 0; i < numMessages; i++)
-      {
-      	if (i % 2 == 0)
-      	{
-      		storageManager.deletePageTransactional(1, i);
-      	}
-      }
-
-      for (int i = numMessages; i < numMessages + numScheduled; i++)
-      {
-      	if (i % 2 == 0)
-      	{
-      		storageManager.deletePageTransactional(1, i);
-      	}
-      }
-
-      storageManager.commit(1);
-
-      EasyMock.replay(storageManager);
-
-      queue.deleteAllReferences(storageManager);
-
-      EasyMock.verify(storageManager);
-
-      assertEquals(0, queue.getMessageCount());
-      assertEquals(0, queue.getScheduledCount());
-      assertEquals(0, queue.getDeliveringCount());
-
-      Thread.sleep(waitTime + 500);
-
-      //Make sure scheduled don't arrive
-
-      FakeConsumer consumer = new FakeConsumer();
-
-      queue.addConsumer(consumer);
-
-      queue.deliverNow();
-
-      assertTrue(consumer.getReferences().isEmpty());
-   }
-
    public void testDeliveryScheduled() throws Exception
    {
       Consumer consumer = EasyMock.createStrictMock(Consumer.class);
@@ -358,29 +268,4 @@
 
    }
 
-   public void testDeliveryScheduledBusyConsumer() throws Exception
-   {
-      Consumer consumer = EasyMock.createStrictMock(Consumer.class);
-      Queue queue = new QueueImpl(1, queue1, null, false, true, false, scheduledExecutor, null);
-      MessageReference messageReference = generateReference(queue, 1);
-      final CountDownLatch countDownLatch = new CountDownLatch(1);
-      EasyMock.expect(consumer.handle(messageReference)).andAnswer(new IAnswer<HandleStatus>()
-      {
-         public HandleStatus answer() throws Throwable
-         {
-            countDownLatch.countDown();
-            return HandleStatus.BUSY;
-         }
-      });
-      EasyMock.expect(consumer.handle(messageReference)).andReturn(HandleStatus.HANDLED);
-      EasyMock.replay(consumer);
-      queue.addConsumer(consumer);
-      messageReference.setScheduledDeliveryTime(System.currentTimeMillis() + 2000);
-      queue.addFirst(messageReference);
-
-      countDownLatch.await(3000, TimeUnit.MILLISECONDS);
-
-      EasyMock.verify(consumer);
-
-   }
 }




More information about the jboss-cvs-commits mailing list