[jboss-cvs] JBoss Messaging SVN: r5354 - in trunk: src/main/org/jboss/messaging/core/paging and 5 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Nov 13 16:55:14 EST 2008


Author: clebert.suconic at jboss.com
Date: 2008-11-13 16:55:14 -0500 (Thu, 13 Nov 2008)
New Revision: 5354

Added:
   trunk/tests/src/org/jboss/messaging/tests/stress/paging/PageStressTest.java
Removed:
   trunk/tests/src/org/jboss/messaging/tests/stress/paging/MultipleDestinationPagingTest.java
Modified:
   trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
   trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java
   trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java
   trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServiceImpl.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplWildcardManagerTest.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1450 - proper shutdown/stop on Journal and Paging

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-11-13 19:27:36 UTC (rev 5353)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-11-13 21:55:14 UTC (rev 5354)
@@ -312,7 +312,14 @@
       }
       finally
       {
-         rwlock.readLock().unlock();
+         try
+         {
+            rwlock.readLock().unlock();
+         }
+         catch (Exception ignored)
+         {
+            // This could happen if the thread was interrupted
+         }
       }
    }
 
@@ -350,7 +357,14 @@
       }
       finally
       {
-         rwlock.readLock().unlock();
+         try
+         {
+            rwlock.readLock().unlock();
+         }
+         catch (Exception ignored)
+         {
+            // This could happen if the thread was interrupted
+         }
       }
    }
 
@@ -385,7 +399,14 @@
       }
       finally
       {
-         rwlock.readLock().unlock();
+         try
+         {
+            rwlock.readLock().unlock();
+         }
+         catch (Exception ignored)
+         {
+            // This could happen if the thread was interrupted
+         }
       }
    }
 
@@ -424,7 +445,14 @@
       }
       finally
       {
-         rwlock.readLock().unlock();
+         try
+         {
+            rwlock.readLock().unlock();
+         }
+         catch (Exception ignored)
+         {
+            // This could happen if the thread was interrupted
+         }
       }
    }
 
@@ -461,7 +489,14 @@
       }
       finally
       {
-         rwlock.readLock().unlock();
+         try
+         {
+            rwlock.readLock().unlock();
+         }
+         catch (Exception ignored)
+         {
+            // This could happen if the thread was interrupted
+         }
       }
    }
 
@@ -497,7 +532,14 @@
       }
       finally
       {
-         rwlock.readLock().unlock();
+         try
+         {
+            rwlock.readLock().unlock();
+         }
+         catch (Exception ignored)
+         {
+            // This could happen if the thread was interrupted
+         }
       }
    }
 
@@ -535,7 +577,14 @@
       }
       finally
       {
-         rwlock.readLock().unlock();
+         try
+         {
+            rwlock.readLock().unlock();
+         }
+         catch (Exception ignored)
+         {
+            // This could happen if the thread was interrupted
+         }
       }
 
       // We should wait this outside of the lock, to increase throuput
@@ -590,7 +639,14 @@
       }
       finally
       {
-         rwlock.readLock().unlock();
+         try
+         {
+            rwlock.readLock().unlock();
+         }
+         catch (Exception ignored)
+         {
+            // This could happen if the thread was interrupted
+         }
       }
 
       // We should wait this outside of the lock, to increase throuput
@@ -636,7 +692,14 @@
       }
       finally
       {
-         rwlock.readLock().unlock();
+         try
+         {
+            rwlock.readLock().unlock();
+         }
+         catch (Exception ignored)
+         {
+            // This could happen if the thread was interrupted
+         }
       }
 
       // We should wait this outside of the lock, to increase throuput
@@ -1513,37 +1576,50 @@
 
    public synchronized void stop() throws Exception
    {
+      trace("Stopping the journal");
+      
       if (state == STATE_STOPPED)
       {
          throw new IllegalStateException("Journal is already stopped");
       }
 
-      if (currentFile != null)
+      positionLock.acquire();
+      rwlock.writeLock().lock();
+
+      try
       {
-         currentFile.getFile().close();
-      }
+         filesExecutor.shutdown();
 
-      filesExecutor.shutdown();
+         if (!filesExecutor.awaitTermination(60, TimeUnit.SECONDS))
+         {
+            log.warn("Couldn't stop journal executor after 60 seconds");
+         }
 
-      if (!filesExecutor.awaitTermination(60, TimeUnit.SECONDS))
-      {
-         log.warn("Couldn't stop journal executor after 60 seconds");
-      }
+         if (currentFile != null)
+         {
+            currentFile.getFile().close();
+         }
 
-      for (JournalFile file : openedFiles)
-      {
-         file.getFile().close();
-      }
+         for (JournalFile file : openedFiles)
+         {
+            file.getFile().close();
+         }
 
-      currentFile = null;
+         currentFile = null;
 
-      dataFiles.clear();
+         dataFiles.clear();
 
-      freeFiles.clear();
+         freeFiles.clear();
 
-      openedFiles.clear();
+         openedFiles.clear();
 
-      state = STATE_STOPPED;
+         state = STATE_STOPPED;
+      }
+      finally
+      {
+         positionLock.release();
+         rwlock.writeLock().unlock();
+      }
    }
 
    // Public
@@ -1855,6 +1931,11 @@
 
       try
       {
+         if (state != STATE_LOADED)
+         {
+            throw new IllegalStateException("The journal was stopped");
+         }
+
          int size = bb.limit();
 
          if (size % currentFile.getFile().getAlignment() != 0)

Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java	2008-11-13 19:27:36 UTC (rev 5353)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java	2008-11-13 21:55:14 UTC (rev 5354)
@@ -58,7 +58,11 @@
 public interface PagingManager extends MessagingComponent
 {
 
+   /** The system is paging because of global-page-mode */
    boolean isGlobalPageMode();
+   
+   /** During startup PostOffice may set GlobalPageMode as true */
+   void setGlobalPageMode(boolean globalMode);
 
    /** To return the PageStore associated with the address */
    PagingStore getPageStore(SimpleString address) throws Exception;

Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java	2008-11-13 19:27:36 UTC (rev 5353)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java	2008-11-13 21:55:14 UTC (rev 5354)
@@ -38,6 +38,8 @@
    PagingStore newStore(org.jboss.messaging.util.SimpleString destinationName, QueueSettings queueSettings);
 
    Executor getPagingExecutor();
+   
+   void stop() throws InterruptedException;
 
    void setPagingManager(PagingManager manager);
 

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java	2008-11-13 19:27:36 UTC (rev 5353)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java	2008-11-13 21:55:14 UTC (rev 5354)
@@ -24,7 +24,9 @@
 
 import java.io.File;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import org.jboss.messaging.core.journal.SequentialFileFactory;
 import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
@@ -50,7 +52,7 @@
 
    private final String directory;
 
-   private final Executor executor;
+   private final ExecutorService executor;
 
    private PagingManager pagingManager;
 
@@ -64,18 +66,18 @@
       executor = Executors.newCachedThreadPool(new JBMThreadFactory("JBM-depaging-threads"));
    }
 
-   public PagingManagerFactoryNIO(final String directory, final Executor executor)
-   {
-      this.directory = directory;
-      this.executor = executor;
-   }
-
    // Public --------------------------------------------------------
 
    public Executor getPagingExecutor()
    {
       return executor;
    }
+   
+   public void stop() throws InterruptedException
+   {
+      executor.shutdown();
+      executor.awaitTermination(60, TimeUnit.SECONDS);
+   }
 
    public PagingStore newStore(final SimpleString destinationName, final QueueSettings settings)
    {

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2008-11-13 19:27:36 UTC (rev 5353)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2008-11-13 21:55:14 UTC (rev 5354)
@@ -125,8 +125,13 @@
    {
       return globalMode.get();
    }
+   
+   public void setGlobalPageMode(boolean globalMode)
+   {
+      this.globalMode.set(globalMode);
+   }
 
-   //FIXME - this is not thread safe
+   // Synchronization of this method is done per ConcurrentHashMap
    public PagingStore getPageStore(final SimpleString storeName) throws Exception
    {
       PagingStore store = stores.get(storeName);
@@ -261,6 +266,8 @@
       }
 
       storageManager.commit(depageTransactionID);
+      
+      trace("Depage committed");
 
       for (MessageReference ref : refsToAdd)
       {
@@ -341,6 +348,8 @@
    public void stop() throws Exception
    {
       started = false;
+      
+      pagingSPI.stop();
 
       for (PagingStore store : stores.values())
       {
@@ -454,7 +463,7 @@
       {
          try
          {
-            while (globalSize.get() < maxGlobalSize)
+            while (globalSize.get() < maxGlobalSize && started)
             {
                boolean depaged = false;
                // Round robin depaging one page at the time from each
@@ -483,7 +492,7 @@
                }
             }
 
-            if (globalSize.get() < maxGlobalSize)
+            if (globalSize.get() < maxGlobalSize && started)
             {
 
                globalMode.set(false);

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2008-11-13 19:27:36 UTC (rev 5353)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2008-11-13 21:55:14 UTC (rev 5354)
@@ -25,7 +25,9 @@
 import java.text.DecimalFormat;
 import java.util.List;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -75,7 +77,7 @@
 
    private final PagingManager pagingManager;
 
-   private final Executor executor;
+   private final ExecutorService executor;
 
    // Bytes consumed by the queue on the memory
    private final AtomicLong sizeInBytes = new AtomicLong();
@@ -96,7 +98,7 @@
 
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
 
-   private volatile boolean initialized = false;
+   private volatile boolean running = false;
 
    private volatile LastPageRecord lastPageRecord;
 
@@ -108,7 +110,7 @@
                           final SequentialFileFactory fileFactory,
                           final SimpleString storeName,
                           final QueueSettings queueSettings,
-                          final Executor executor)
+                          final ExecutorService executor)
    {
       this.fileFactory = fileFactory;
       this.storeName = storeName;
@@ -418,18 +420,22 @@
 
    public synchronized boolean isStarted()
    {
-      return initialized;
+      return running;
    }
 
    public synchronized void stop() throws Exception
    {
-      if (initialized)
+      if (running)
       {
          lock.writeLock().lock();
 
          try
          {
-            initialized = false;
+            running = false;
+            
+            executor.shutdown();
+            executor.awaitTermination(60, TimeUnit.SECONDS);
+            
             if (currentPage != null)
             {
                currentPage.close();
@@ -445,7 +451,7 @@
    public synchronized void start() throws Exception
    {
 
-      if (initialized)
+      if (running)
       {
          // don't throw an exception.
          // You could have two threads adding PagingStore to a
@@ -483,7 +489,7 @@
             }
          }
 
-         initialized = true;
+         running = true;
 
          if (numberOfPages != 0)
          {
@@ -635,12 +641,11 @@
       {
          try
          {
-            boolean needMorePages = false;
-            do
+            boolean needMorePages = true;
+            while (needMorePages && running)
             {
                needMorePages = readPage();
             }
-            while (needMorePages);
          }
          catch (Exception e)
          {

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2008-11-13 19:27:36 UTC (rev 5353)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2008-11-13 21:55:14 UTC (rev 5354)
@@ -25,6 +25,7 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -453,13 +454,35 @@
       }
 
       storageManager.loadMessages(this, queues, resourceManager);
+      
+      // TODO: This is related to http://www.jboss.com/index.html?module=bb&op=viewtopic&t=145597
+      HashSet<SimpleString> addresses = new HashSet<SimpleString>();
+      
+      for (Binding binding: bindings)
+      {
+         addresses.add(binding.getAddress());
+      }
+      
+      for (SimpleString destination: dests)
+      {
+         addresses.add(destination);
+      }
+      
+      // End TODO -------------------------------------
 
-      for (SimpleString destination : dests)
+      for (SimpleString destination : addresses)
       {
          if (!pagingManager.isGlobalPageMode())
          {
             PagingStore store = pagingManager.getPageStore(destination);
-            store.startDepaging();
+            if (store.isPaging() && store.getMaxSizeBytes() < 0)
+            {
+               pagingManager.setGlobalPageMode(true);
+            }
+            else
+            {
+               store.startDepaging();
+            }
          }
       }
    }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServiceImpl.java	2008-11-13 19:27:36 UTC (rev 5353)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServiceImpl.java	2008-11-13 21:55:14 UTC (rev 5354)
@@ -140,8 +140,8 @@
    public void stop() throws Exception
    {
       remotingService.stop();
+      server.stop();
       storageManager.stop();
-      server.stop();
    }
    
    public MessagingServer getServer()

Deleted: trunk/tests/src/org/jboss/messaging/tests/stress/paging/MultipleDestinationPagingTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/stress/paging/MultipleDestinationPagingTest.java	2008-11-13 19:27:36 UTC (rev 5353)
+++ trunk/tests/src/org/jboss/messaging/tests/stress/paging/MultipleDestinationPagingTest.java	2008-11-13 21:55:14 UTC (rev 5354)
@@ -1,193 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors by
- * the @authors tag. See the copyright.txt in the distribution for a full listing of individual contributors. This is
- * free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
- * This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
- * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
- * details. You should have received a copy of the GNU Lesser General Public License along with this software; if not,
- * write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
- * site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.tests.stress.paging;
-
-import java.util.HashMap;
-
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.server.MessagingService;
-import org.jboss.messaging.core.settings.impl.QueueSettings;
-import org.jboss.messaging.tests.integration.base.IntegrationTestBase;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- * This is an integration-tests that will take some time to run. TODO: Maybe this test belongs somewhere else?
- * 
- * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
- */
-public class MultipleDestinationPagingTest extends IntegrationTestBase
-{
-
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   MessagingService service;
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-   public void testGlobalPage() throws Exception
-   {
-      testPage(true);
-   }
-
-   public void testRegularPage() throws Exception
-   {
-      testPage(false);
-   }
-
-   public void testPage(boolean globalPage) throws Exception
-   {
-      Configuration config = createDefaultConfig();
-
-      HashMap<String, QueueSettings> settings = new HashMap<String, QueueSettings>();
-
-      if (globalPage)
-      {
-         config.setPagingMaxGlobalSizeBytes(20 * 1024 * 1024);
-         QueueSettings setting = new QueueSettings();
-         setting.setMaxSizeBytes(-1);
-         settings.put("page-adr", setting);
-      }
-      else
-      {
-         config.setPagingMaxGlobalSizeBytes(-1);
-         QueueSettings setting = new QueueSettings();
-         setting.setMaxSizeBytes(20 * 1024 * 1024);
-         settings.put("page-adr", setting);
-      }
-
-      service = createService(true, false, config, settings);
-      service.start();
-
-      ClientSessionFactory factory = createInVMFactory();
-      ClientSession session = null;
-
-      try
-      {
-         session = factory.createSession(false, false, false);
-
-         SimpleString address = new SimpleString("page-adr");
-         SimpleString queue[] = new SimpleString[] { new SimpleString("queue1"), new SimpleString("queue2") };
-
-         session.createQueue(address, queue[0], null, true, false, true);
-         session.createQueue(address, queue[1], null, true, false, true);
-
-         ClientProducer prod = session.createProducer(address);
-
-         ClientMessage message = createBytesMessage(session, new byte[700], false);
-
-         int NUMBER_OF_MESSAGES = 60000;
-
-         for (int i = 0; i < NUMBER_OF_MESSAGES; i++)
-         {
-            if (i % 10000 == 0)
-               System.out.println(i);
-            prod.send(message);
-         }
-
-         session.commit();
-
-         session.start();
-
-         int counters[] = new int[2];
-
-         ClientConsumer consumers[] = new ClientConsumer[] { session.createConsumer(queue[0]),
-                                                            session.createConsumer(queue[1]) };
-
-         int reads = 0;
-
-         while (true)
-         {
-            int msgs1 = readMessages(session, consumers[0], queue[0]);
-            if (reads++ == 0)
-            {
-               assertTrue(msgs1 > 0 && msgs1 < NUMBER_OF_MESSAGES);
-            }
-            int msgs2 = readMessages(session, consumers[1], queue[1]);
-            counters[0] += msgs1;
-            counters[1] += msgs2;
-
-            System.out.println("msgs1 = " + msgs1 + " msgs2 = " + msgs2);
-
-            if (msgs1 + msgs2 == 0)
-            {
-               break;
-            }
-         }
-
-         consumers[0].close();
-         consumers[1].close();
-
-         assertEquals(NUMBER_OF_MESSAGES, counters[0]);
-         assertEquals(NUMBER_OF_MESSAGES, counters[1]);
-      }
-      finally
-      {
-         session.close();
-         service.stop();
-      }
-
-   }
-
-   private int readMessages(ClientSession session, ClientConsumer consumer, SimpleString queue) throws MessagingException
-   {
-      session.start();
-      int msgs = 0;
-
-      ClientMessage msg = null;
-      do
-      {
-         msg = consumer.receive(1000);
-         if (msg != null)
-         {
-            msg.acknowledge();
-            if (++msgs % 10000 == 0)
-            {
-               System.out.println("received " + msgs);
-               session.commit();
-
-            }
-         }
-      }
-      while (msg != null);
-
-      session.commit();
-
-      return msgs;
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   protected void setUp() throws Exception
-   {
-      clearData();
-   }
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-
-}

Copied: trunk/tests/src/org/jboss/messaging/tests/stress/paging/PageStressTest.java (from rev 5347, trunk/tests/src/org/jboss/messaging/tests/stress/paging/MultipleDestinationPagingTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/stress/paging/PageStressTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/stress/paging/PageStressTest.java	2008-11-13 21:55:14 UTC (rev 5354)
@@ -0,0 +1,325 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors by
+ * the @authors tag. See the copyright.txt in the distribution for a full listing of individual contributors. This is
+ * free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
+ * This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
+ * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details. You should have received a copy of the GNU Lesser General Public License along with this software; if not,
+ * write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.stress.paging;
+
+import java.util.HashMap;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
+import org.jboss.messaging.tests.integration.base.IntegrationTestBase;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * This is an integration-tests that will take some time to run. TODO: Maybe this test belongs somewhere else?
+ * 
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ */
+public class PageStressTest extends IntegrationTestBase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   MessagingService service;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+   
+   public void testStopDuringGlobalDepage() throws Exception
+   {
+      testStopDuringDepage(true);
+   }
+   
+   public void testStopDuringRegularDepage() throws Exception
+   {
+      testStopDuringDepage(false);
+   }
+   
+   
+   public void testStopDuringDepage(boolean globalPage) throws Exception
+   {
+      HashMap<String, QueueSettings> settings = new HashMap<String, QueueSettings>();
+
+      Configuration config = createConfig(globalPage, settings);
+
+      service = createService(true, false, config, settings);
+      service.start();
+
+      ClientSessionFactory factory = createInVMFactory();
+      factory.setBlockOnAcknowledge(true);
+      ClientSession session = null;
+
+      try
+      {
+
+         final int NUMBER_OF_MESSAGES = 60000;
+         
+         session = factory.createSession(null, null, false, false, true, 1024 * NUMBER_OF_MESSAGES);
+
+         SimpleString address = new SimpleString("page-adr");
+
+         session.createQueue(address, address, null, true, false, true);
+
+         ClientProducer prod = session.createProducer(address);
+
+         ClientMessage message = createBytesMessage(session, new byte[700], true);
+
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++)
+         {
+            if (i % 10000 == 0)
+               System.out.println("Sent " + i);
+            prod.send(message);
+         }
+
+         session.commit();
+
+         session.start();
+
+         ClientConsumer consumer = session.createConsumer(address);
+
+
+         int msgs = 0;
+         ClientMessage msg = null;
+         do
+         {
+            msg = consumer.receive(1000);
+            if (msg != null)
+            {
+               msg.acknowledge();
+               if ((++msgs) % 1000 == 0)
+               {
+                  System.out.println("Received " + msgs);
+               }
+            }
+         } while (msg != null);
+
+         session.commit();
+         
+         session.close();
+         
+         service.stop();
+         
+         System.out.println("server stopped, nr msgs: " + msgs);
+
+         settings = new HashMap<String, QueueSettings>();
+         config = createConfig(globalPage, settings);
+
+         service = createService(true, false, config, settings);
+         service.start();
+         
+         
+         factory = createInVMFactory();
+         
+         session = factory.createSession(false, false, false);
+
+         consumer = session.createConsumer(address);
+         
+         session.start();
+         
+         msg = null;
+         do
+         {
+            msg = consumer.receive(1000);
+            if (msg != null)
+            {
+               msg.acknowledge();
+               session.commit();
+               if ((++msgs) % 1000 == 0)
+               {
+                  System.out.println("Received " + msgs);
+               }
+             }
+         } while (msg != null);
+         
+         System.out.println("msgs second time: " + msgs);
+         
+         assertEquals(NUMBER_OF_MESSAGES, msgs);
+      }
+      finally
+      {
+         session.close();
+         service.stop();
+      }
+
+   }
+
+   public void testGlobalPageOnMultipleDestinations() throws Exception
+   {
+      testPageOnMultipleDestinations(true);
+   }
+
+   public void testRegularPageOnMultipleDestinations() throws Exception
+   {
+      testPageOnMultipleDestinations(false);
+   }
+
+   public void testPageOnMultipleDestinations(boolean globalPage) throws Exception
+   {
+      HashMap<String, QueueSettings> settings = new HashMap<String, QueueSettings>();
+
+      Configuration config = createConfig(globalPage, settings);
+
+      service = createService(true, false, config, settings);
+      service.start();
+
+      ClientSessionFactory factory = createInVMFactory();
+      ClientSession session = null;
+
+      try
+      {
+         session = factory.createSession(false, false, false);
+
+         SimpleString address = new SimpleString("page-adr");
+         SimpleString queue[] = new SimpleString[] { new SimpleString("queue1"), new SimpleString("queue2") };
+
+         session.createQueue(address, queue[0], null, true, false, true);
+         session.createQueue(address, queue[1], null, true, false, true);
+
+         ClientProducer prod = session.createProducer(address);
+
+         ClientMessage message = createBytesMessage(session, new byte[700], false);
+
+         int NUMBER_OF_MESSAGES = 60000;
+
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++)
+         {
+            if (i % 10000 == 0)
+               System.out.println(i);
+            prod.send(message);
+         }
+
+         session.commit();
+
+         session.start();
+
+         int counters[] = new int[2];
+
+         ClientConsumer consumers[] = new ClientConsumer[] { session.createConsumer(queue[0]),
+                                                            session.createConsumer(queue[1]) };
+
+         int reads = 0;
+
+         while (true)
+         {
+            int msgs1 = readMessages(session, consumers[0], queue[0]);
+            if (reads++ == 0)
+            {
+               assertTrue(msgs1 > 0 && msgs1 < NUMBER_OF_MESSAGES);
+            }
+            int msgs2 = readMessages(session, consumers[1], queue[1]);
+            counters[0] += msgs1;
+            counters[1] += msgs2;
+
+            System.out.println("msgs1 = " + msgs1 + " msgs2 = " + msgs2);
+
+            if (msgs1 + msgs2 == 0)
+            {
+               break;
+            }
+         }
+
+         consumers[0].close();
+         consumers[1].close();
+
+         assertEquals(NUMBER_OF_MESSAGES, counters[0]);
+         assertEquals(NUMBER_OF_MESSAGES, counters[1]);
+      }
+      finally
+      {
+         session.close();
+         service.stop();
+      }
+
+   }
+
+   private int readMessages(ClientSession session, ClientConsumer consumer, SimpleString queue) throws MessagingException
+   {
+      session.start();
+      int msgs = 0;
+
+      ClientMessage msg = null;
+      do
+      {
+         msg = consumer.receive(1000);
+         if (msg != null)
+         {
+            msg.acknowledge();
+            if (++msgs % 10000 == 0)
+            {
+               System.out.println("received " + msgs);
+               session.commit();
+
+            }
+         }
+      }
+      while (msg != null);
+
+      session.commit();
+
+      return msgs;
+   }
+   
+   /**
+    * @param globalPage
+    * @param settings
+    * @return
+    */
+   private Configuration createConfig(boolean globalPage, HashMap<String, QueueSettings> settings)
+   {
+      Configuration config = createDefaultConfig();
+
+      if (globalPage)
+      {
+         config.setPagingMaxGlobalSizeBytes(20 * 1024 * 1024);
+         QueueSettings setting = new QueueSettings();
+         setting.setMaxSizeBytes(-1);
+         settings.put("page-adr", setting);
+      }
+      else
+      {
+         config.setPagingMaxGlobalSizeBytes(-1);
+         QueueSettings setting = new QueueSettings();
+         setting.setMaxSizeBytes(20 * 1024 * 1024);
+         settings.put("page-adr", setting);
+      }
+      return config;
+   }
+
+   
+
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   protected void setUp() throws Exception
+   {
+      clearData();
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}


Property changes on: trunk/tests/src/org/jboss/messaging/tests/stress/paging/PageStressTest.java
___________________________________________________________________
Name: svn:mergeinfo
   + 

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplTest.java	2008-11-13 19:27:36 UTC (rev 5353)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplTest.java	2008-11-13 21:55:14 UTC (rev 5354)
@@ -121,7 +121,9 @@
       EasyMock.expect(queue.getName()).andStubReturn(queueName);
       EasyMock.expect(queue.getPersistenceID()).andStubReturn(1);
       ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
+      
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
+      EasyMock.expect(pgm.isGlobalPageMode()).andStubReturn(true);
 
       PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
 
@@ -133,11 +135,11 @@
                       (Map<Long, Queue>)EasyMock.anyObject(),
                       (ResourceManager)EasyMock.anyObject());
 
-      EasyMock.replay(pm, qf, binding, queue);
+      EasyMock.replay(pm, pgm, qf, binding, queue);
 
       postOffice.start();
 
-      EasyMock.verify(pm, qf, binding, queue);
+      EasyMock.verify(pm, pgm, qf, binding, queue);
 
       assertTrue(postOffice.isStarted());
       assertEquals(postOffice.getBinding(queueName), binding);
@@ -170,6 +172,7 @@
       QueueFactory qf = EasyMock.createStrictMock(QueueFactory.class);
       ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
+      EasyMock.expect(pgm.isGlobalPageMode()).andStubReturn(true);
 
       PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
 
@@ -267,6 +270,7 @@
       ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
 
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
+      EasyMock.expect(pgm.isGlobalPageMode()).andStubReturn(true);
 
       PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
 

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplWildcardManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplWildcardManagerTest.java	2008-11-13 19:27:36 UTC (rev 5353)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplWildcardManagerTest.java	2008-11-13 21:55:14 UTC (rev 5354)
@@ -66,6 +66,7 @@
       QueueFactory qf = EasyMock.createStrictMock(QueueFactory.class);
       ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
+      EasyMock.expect(pgm.isGlobalPageMode()).andStubReturn(true);
 
       PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
 




More information about the jboss-cvs-commits mailing list