[jboss-cvs] JBoss Messaging SVN: r6615 - in trunk: src/config/jboss-as/non-clustered and 31 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Apr 29 02:26:48 EDT 2009


Author: timfox
Date: 2009-04-29 02:26:47 -0400 (Wed, 29 Apr 2009)
New Revision: 6615

Removed:
   trunk/src/main/org/jboss/messaging/core/memory/
Modified:
   trunk/src/config/jboss-as/clustered/jbm-configuration.xml
   trunk/src/config/jboss-as/non-clustered/jbm-configuration.xml
   trunk/src/config/stand-alone/clustered/jbm-configuration.xml
   trunk/src/config/stand-alone/non-clustered/jbm-configuration.xml
   trunk/src/main/org/jboss/messaging/core/config/Configuration.java
   trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
   trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
   trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java
   trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
   trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareMessagingServerControlWrapper.java
   trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java
   trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java
   trunk/src/main/org/jboss/messaging/utils/OrderedExecutorFactory.java
   trunk/src/schemas/jbm-configuration.xsd
   trunk/tests/config/ConfigurationTest-config.xml
   trunk/tests/config/ConfigurationTest-full-config.xml
   trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/journal/AIOSequentialFileFactoryTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/journal/RealAIOJournalImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/management/MessagingServerControlTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/management/MessagingServerControlUsingCoreTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/paging/PageCrashTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/persistence/JournalStorageManagerIntegrationTest.java
   trunk/tests/src/org/jboss/messaging/tests/performance/journal/RealJournalImplAIOTest.java
   trunk/tests/src/org/jboss/messaging/tests/performance/persistence/StorageManagerTimingTest.java
   trunk/tests/src/org/jboss/messaging/tests/stress/journal/AddAndRemoveStressTest.java
   trunk/tests/src/org/jboss/messaging/tests/stress/journal/remote/RemoteJournalAppender.java
   trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/RealJournalImplAIOTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/DefaultsFileConfigurationTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/deployers/impl/QueueDeployerTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AIOSequentialFileTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingManagerITest.java
Log:
mainly https://jira.jboss.org/jira/browse/JBMESSAGING-1525

Modified: trunk/src/config/jboss-as/clustered/jbm-configuration.xml
===================================================================
--- trunk/src/config/jboss-as/clustered/jbm-configuration.xml	2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/src/config/jboss-as/clustered/jbm-configuration.xml	2009-04-29 06:26:47 UTC (rev 6615)
@@ -6,7 +6,7 @@
       <clustered>true</clustered>
       
       <!-- Maximum number of threads to use for scheduled deliveries -->
-      <scheduled-max-pool-size>30</scheduled-max-pool-size>
+      <scheduled-thread-pool-max-size>30</scheduled-thread-pool-max-size>
 
       <security-enabled>true</security-enabled>
       

Modified: trunk/src/config/jboss-as/non-clustered/jbm-configuration.xml
===================================================================
--- trunk/src/config/jboss-as/non-clustered/jbm-configuration.xml	2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/src/config/jboss-as/non-clustered/jbm-configuration.xml	2009-04-29 06:26:47 UTC (rev 6615)
@@ -6,7 +6,7 @@
       <clustered>false</clustered>
                   
       <!-- Maximum number of threads to use for scheduled deliveries -->
-      <scheduled-max-pool-size>30</scheduled-max-pool-size>
+      <scheduled-thread-pool-max-size>30</scheduled-thread-pool-max-size>
 
       <security-enabled>true</security-enabled>
       

Modified: trunk/src/config/stand-alone/clustered/jbm-configuration.xml
===================================================================
--- trunk/src/config/stand-alone/clustered/jbm-configuration.xml	2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/src/config/stand-alone/clustered/jbm-configuration.xml	2009-04-29 06:26:47 UTC (rev 6615)
@@ -6,7 +6,7 @@
       <clustered>true</clustered>
       
       <!-- Maximum number of threads to use for scheduled deliveries -->
-      <scheduled-max-pool-size>30</scheduled-max-pool-size>
+      <scheduled-thread-pool-max-size>30</scheduled-thread-pool-max-size>
 
       <security-enabled>true</security-enabled>
       

Modified: trunk/src/config/stand-alone/non-clustered/jbm-configuration.xml
===================================================================
--- trunk/src/config/stand-alone/non-clustered/jbm-configuration.xml	2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/src/config/stand-alone/non-clustered/jbm-configuration.xml	2009-04-29 06:26:47 UTC (rev 6615)
@@ -6,7 +6,7 @@
       <clustered>false</clustered>
                   
       <!-- Maximum number of threads to use for scheduled deliveries -->
-      <scheduled-max-pool-size>30</scheduled-max-pool-size>
+      <scheduled-thread-pool-max-size>30</scheduled-thread-pool-max-size>
 
       <security-enabled>true</security-enabled>
       

Modified: trunk/src/main/org/jboss/messaging/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/Configuration.java	2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/src/main/org/jboss/messaging/core/config/Configuration.java	2009-04-29 06:26:47 UTC (rev 6615)
@@ -75,7 +75,11 @@
    long getQueueActivationTimeout();
 
    void setQueueActivationTimeout(long timeout);
+   
+   int getThreadPoolMaxSize();
 
+   void setThreadPoolMaxSize(int maxSize);
+
    int getScheduledThreadPoolMaxSize();
 
    void setScheduledThreadPoolMaxSize(int maxSize);
@@ -206,10 +210,6 @@
 
    // Paging Properties --------------------------------------------------------------------
 
-   int getPagingMaxThreads();
-
-   void setPagingMaxThread(int pagingMaxThreads);
-
    String getPagingDirectory();
 
    void setPagingDirectory(String dir);

Modified: trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2009-04-29 06:26:47 UTC (rev 6615)
@@ -55,7 +55,9 @@
 
    public static final long DEFAULT_QUEUE_ACTIVATION_TIMEOUT = 30000;
 
-   public static final int DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE = 30;
+   public static final int DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE = 10;
+   
+   public static final int DEFAULT_THREAD_POOL_MAX_SIZE = -1;
 
    public static final long DEFAULT_SECURITY_INVALIDATION_INTERVAL = 10000;
 
@@ -158,6 +160,8 @@
    protected long queueActivationTimeout = DEFAULT_QUEUE_ACTIVATION_TIMEOUT;
 
    protected int scheduledThreadPoolMaxSize = DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
+   
+   protected int threadPoolMaxSize = DEFAULT_THREAD_POOL_MAX_SIZE;
 
    protected long securityInvalidationInterval = DEFAULT_SECURITY_INVALIDATION_INTERVAL;
 
@@ -205,8 +209,6 @@
 
    protected String pagingDirectory = DEFAULT_PAGING_DIR;
 
-   protected int pagingMaxThreads = DEFAULT_PAGE_MAX_THREADS;
-
    // File related attributes -----------------------------------------------------------
 
    protected String largeMessagesDirectory = DEFAULT_LARGE_MESSAGES_DIR;
@@ -355,7 +357,17 @@
    {
       scheduledThreadPoolMaxSize = maxSize;
    }
+   
+   public int getThreadPoolMaxSize()
+   {
+      return threadPoolMaxSize;
+   }
 
+   public void setThreadPoolMaxSize(final int maxSize)
+   {
+      threadPoolMaxSize = maxSize;
+   }
+
    public long getSecurityInvalidationInterval()
    {
       return securityInvalidationInterval;
@@ -531,16 +543,6 @@
       return journalType;
    }
 
-   public int getPagingMaxThreads()
-   {
-      return pagingMaxThreads;
-   }
-
-   public void setPagingMaxThread(final int pagingMaxThreads)
-   {
-      this.pagingMaxThreads = pagingMaxThreads;
-   }
-
    public void setPagingDirectory(final String dir)
    {
       pagingDirectory = dir;

Modified: trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java	2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java	2009-04-29 06:26:47 UTC (rev 6615)
@@ -107,7 +107,9 @@
 
       // NOTE! All the defaults come from the super class
 
-      scheduledThreadPoolMaxSize = getInteger(e, "scheduled-max-pool-size", scheduledThreadPoolMaxSize);
+      scheduledThreadPoolMaxSize = getInteger(e, "scheduled-thread-pool-max-size", scheduledThreadPoolMaxSize);
+      
+      threadPoolMaxSize = getInteger(e, "thread-pool-max-size", threadPoolMaxSize);
 
       securityEnabled = getBoolean(e, "security-enabled", securityEnabled);
 
@@ -265,8 +267,6 @@
 
       journalDirectory = getString(e, "journal-directory", journalDirectory);
 
-      pagingMaxThreads = getInteger(e, "paging-max-threads", pagingMaxThreads);
-
       pagingDirectory = getString(e, "paging-directory", pagingDirectory);
 
       pagingMaxGlobalSize = getLong(e, "paging-max-global-size-bytes", pagingMaxGlobalSize);

Modified: trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java	2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java	2009-04-29 06:26:47 UTC (rev 6615)
@@ -30,7 +30,6 @@
 import javax.management.openmbean.TabularData;
 
 import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.utils.SimpleString;
 
 /**
  * This interface describes the core management interface exposed by the server
@@ -52,6 +51,8 @@
    boolean isClustered();
 
    int getScheduledThreadPoolMaxSize();
+   
+   int getThreadPoolMaxSize();
 
    long getSecurityInvalidationInterval();
 
@@ -186,8 +187,6 @@
 
    boolean isPersistIDCache();
 
-   int getPagingMaxThreads();
-
    int getPagingGlobalWatermarkSize();
 
    String getLargeMessagesDirectory();

Modified: trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java	2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java	2009-04-29 06:26:47 UTC (rev 6615)
@@ -203,6 +203,11 @@
    {
       return configuration.getScheduledThreadPoolMaxSize();
    }
+   
+   public int getThreadPoolMaxSize()
+   {
+      return configuration.getThreadPoolMaxSize();
+   }
 
    public long getSecurityInvalidationInterval()
    {
@@ -584,11 +589,6 @@
       return configuration.getPagingGlobalWatermarkSize();
    }
 
-   public int getPagingMaxThreads()
-   {
-      return configuration.getPagingMaxThreads();
-   }
-
    public long getQueueActivationTimeout()
    {
       return configuration.getQueueActivationTimeout();

Modified: trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareMessagingServerControlWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareMessagingServerControlWrapper.java	2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareMessagingServerControlWrapper.java	2009-04-29 06:26:47 UTC (rev 6615)
@@ -148,6 +148,11 @@
    {
       return localControl.getScheduledThreadPoolMaxSize();
    }
+   
+   public int getThreadPoolMaxSize()
+   {
+      return localControl.getThreadPoolMaxSize();
+   }
 
    public long getSecurityInvalidationInterval()
    {
@@ -364,11 +369,6 @@
       return localControl.getPagingGlobalWatermarkSize();
    }
 
-   public int getPagingMaxThreads()
-   {
-      return localControl.getPagingMaxThreads();
-   }
-
    public long getQueueActivationTimeout()
    {
       return localControl.getQueueActivationTimeout();

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java	2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java	2009-04-29 06:26:47 UTC (rev 6615)
@@ -33,9 +33,6 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 
 import org.jboss.messaging.core.journal.SequentialFileFactory;
 import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
@@ -47,7 +44,7 @@
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.AddressSettings;
-import org.jboss.messaging.utils.OrderedExecutorFactory;
+import org.jboss.messaging.utils.ExecutorFactory;
 import org.jboss.messaging.utils.SimpleString;
 import org.jboss.messaging.utils.UUIDGenerator;
 
@@ -69,10 +66,8 @@
 
    private final String directory;
 
-   private final ExecutorService parentExecutor;
+   private final ExecutorFactory executorFactory;
 
-   private final OrderedExecutorFactory executorFactory;
-
    private final Executor globalDepagerExecutor;
 
    private PagingManager pagingManager;
@@ -85,14 +80,12 @@
 
    // Constructors --------------------------------------------------
 
-   public PagingStoreFactoryNIO(final String directory, final int maxThreads)
+   public PagingStoreFactoryNIO(final String directory, final ExecutorFactory executorFactory)
    {
       this.directory = directory;
 
-      parentExecutor = Executors.newFixedThreadPool(maxThreads, new org.jboss.messaging.utils.JBMThreadFactory("JBM-depaging-threads"));
+      this.executorFactory = executorFactory;
 
-      executorFactory = new org.jboss.messaging.utils.OrderedExecutorFactory(parentExecutor);
-
       globalDepagerExecutor = executorFactory.getExecutor();
    }
 
@@ -103,11 +96,8 @@
       return globalDepagerExecutor;
    }
 
-   public void stop() throws InterruptedException
+   public void stop()
    {
-      parentExecutor.shutdown();
-
-      parentExecutor.awaitTermination(30, TimeUnit.SECONDS);
    }
 
    public synchronized PagingStore newStore(final SimpleString destinationName, final AddressSettings settings) throws Exception

Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2009-04-29 06:26:47 UTC (rev 6615)
@@ -32,9 +32,8 @@
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.transaction.xa.Xid;
@@ -76,7 +75,6 @@
 import org.jboss.messaging.core.transaction.impl.TransactionImpl;
 import org.jboss.messaging.utils.DataConstants;
 import org.jboss.messaging.utils.IDGenerator;
-import org.jboss.messaging.utils.JBMThreadFactory;
 import org.jboss.messaging.utils.Pair;
 import org.jboss.messaging.utils.SimpleString;
 import org.jboss.messaging.utils.UUID;
@@ -93,16 +91,15 @@
 public class JournalStorageManager implements StorageManager
 {
    private static final Logger log = Logger.getLogger(JournalStorageManager.class);
-   
+
    private static final long CHECKPOINT_BATCH_SIZE = 2 ^ 32;
 
-
    // Bindings journal record type
 
    public static final byte QUEUE_BINDING_RECORD = 21;
 
    public static final byte PERSISTENT_ID_RECORD = 23;
-   
+
    public static final byte ID_COUNTER_RECORD = 24;
 
    // type + expiration + timestamp + priority
@@ -125,7 +122,7 @@
    public static final byte SET_SCHEDULED_DELIVERY_TIME = 36;
 
    public static final byte DUPLICATE_ID = 37;
-   
+
    private UUID persistentID;
 
    private final BatchingIDGenerator idGenerator = new BatchingIDGenerator(0, CHECKPOINT_BATCH_SIZE);
@@ -138,17 +135,17 @@
 
    private volatile boolean started;
 
-   private final ExecutorService executor;
-   
-   public JournalStorageManager(final Configuration config)
+   private final Executor executor;
+
+   public JournalStorageManager(final Configuration config, final Executor executor)
    {
-      this.executor = Executors.newCachedThreadPool(new JBMThreadFactory("JBM-journal-storage-manager"));
+      this.executor = executor;
 
       if (config.getJournalType() != JournalType.NIO && config.getJournalType() != JournalType.ASYNCIO)
       {
          throw new IllegalArgumentException("Only NIO and AsyncIO are supported journals");
       }
-      
+
       String bindingsDir = config.getBindingsDirectory();
 
       if (bindingsDir == null)
@@ -214,41 +211,30 @@
       largeMessagesFactory = new NIOSequentialFileFactory(config.getLargeMessagesDirectory());
    }
 
-   /* This constructor is only used for testing */
-   public JournalStorageManager(final Journal messageJournal,
-                                final Journal bindingsJournal,
-                                final SequentialFileFactory largeMessagesFactory)
-   {
-      this.executor = Executors.newCachedThreadPool(new JBMThreadFactory("JBM-journal-storage-manager"));
-      this.messageJournal = messageJournal;
-      this.bindingsJournal = bindingsJournal;
-      this.largeMessagesFactory = largeMessagesFactory;
-   }
-   
    public UUID getPersistentID()
-   {   
+   {
       return persistentID;
    }
-   
+
    public void setPersistentID(UUID id) throws Exception
    {
       long recordID = generateUniqueID();
-      
+
       if (id != null)
       {
-         bindingsJournal.appendAddRecord(recordID, PERSISTENT_ID_RECORD, new PersistentIDEncoding(id), true);                       
+         bindingsJournal.appendAddRecord(recordID, PERSISTENT_ID_RECORD, new PersistentIDEncoding(id), true);
       }
-      
+
       this.persistentID = id;
    }
 
    public long generateUniqueID()
    {
       long id = idGenerator.generateID();
-      
+
       return id;
    }
-   
+
    public long getCurrentUniqueID()
    {
       return idGenerator.getCurrentID();
@@ -258,7 +244,7 @@
    {
       idGenerator.setID(id);
    }
-   
+
    public LargeServerMessage createLargeMessage()
    {
       return new JournalLargeServerMessage(this);
@@ -466,7 +452,7 @@
                                   final ResourceManager resourceManager,
                                   final Map<Long, Queue> queues,
                                   final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
-   {      
+   {
       List<RecordInfo> records = new ArrayList<RecordInfo>();
 
       List<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
@@ -679,11 +665,7 @@
          }
       }
 
-      loadPreparedTransactions(pagingManager,
-                               resourceManager,
-                               queues,
-                               preparedTransactions,
-                               duplicateIDMap);
+      loadPreparedTransactions(pagingManager, resourceManager, queues, preparedTransactions, duplicateIDMap);
    }
 
    private void loadPreparedTransactions(final PagingManager pagingManager,
@@ -713,7 +695,7 @@
          {
             byte[] data = record.data;
 
-            MessagingBuffer buff = ChannelBuffers.wrappedBuffer(data); 
+            MessagingBuffer buff = ChannelBuffers.wrappedBuffer(data);
 
             byte recordType = record.getUserRecordType();
 
@@ -838,7 +820,7 @@
          {
             byte[] data = record.data;
 
-            MessagingBuffer buff = ChannelBuffers.wrappedBuffer(data); 
+            MessagingBuffer buff = ChannelBuffers.wrappedBuffer(data);
 
             long messageID = record.id;
 
@@ -899,13 +881,13 @@
    }
 
    public void loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos) throws Exception
-   {      
+   {
       List<RecordInfo> records = new ArrayList<RecordInfo>();
 
       List<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
 
       bindingsJournal.load(records, preparedTransactions);
-      
+
       long lastID = -1;
 
       for (RecordInfo record : records)
@@ -925,21 +907,21 @@
             bindingEncoding.setPersistenceID(id);
 
             queueBindingInfos.add(bindingEncoding);
-         }        
+         }
          else if (rec == PERSISTENT_ID_RECORD)
          {
             PersistentIDEncoding encoding = new PersistentIDEncoding();
-            
+
             encoding.decode(buffer);
-            
+
             persistentID = encoding.uuid;
          }
          else if (rec == ID_COUNTER_RECORD)
          {
             IDCounterEncoding encoding = new IDCounterEncoding();
-            
+
             encoding.decode(buffer);
-            
+
             lastID = encoding.id;
          }
          else
@@ -950,8 +932,6 @@
 
       idGenerator.setID(lastID + 1);
    }
-      
-   
 
    // MessagingComponent implementation
    // ------------------------------------------------------
@@ -962,9 +942,9 @@
       {
          return;
       }
-           
+
       cleanupIncompleteFiles();
-      
+
       bindingsJournal.start();
 
       messageJournal.start();
@@ -978,18 +958,14 @@
       {
          return;
       }
-      
-      //Must call close to make sure last id is persisted
+
+      // Must call close to make sure last id is persisted
       idGenerator.close();
 
-      executor.shutdown();
-
       bindingsJournal.stop();
 
       messageJournal.stop();
 
-      executor.awaitTermination(60, TimeUnit.SECONDS);
-      
       persistentID = null;
 
       started = false;
@@ -1019,7 +995,6 @@
    {
       this.executor.execute(new Runnable()
       {
-
          public void run()
          {
             try
@@ -1108,11 +1083,11 @@
 
          nextID = start + checkpointSize;
       }
-      
+
       public void setID(final long id)
       {
          this.counter.set(id);
-         
+
          nextID = id + checkpointSize;
       }
 
@@ -1146,12 +1121,12 @@
       {
          return counter.get();
       }
-      
+
       public void close()
       {
          storeID(counter.get());
       }
-      
+
       private void storeID(final long id)
       {
          try
@@ -1164,7 +1139,7 @@
          }
       }
    }
-   
+
    private static class XidEncoding implements EncodingSupport
    {
       final Xid xid;
@@ -1293,7 +1268,7 @@
       }
 
    }
-   
+
    private static class PersistentIDEncoding implements EncodingSupport
    {
       UUID uuid;
@@ -1310,9 +1285,9 @@
       public void decode(final MessagingBuffer buffer)
       {
          byte[] bytes = new byte[16];
-         
+
          buffer.readBytes(bytes);
-         
+
          uuid = new UUID(UUID.TYPE_TIME_BASED, bytes);
       }
 
@@ -1327,7 +1302,7 @@
       }
 
    }
-   
+
    private static class IDCounterEncoding implements EncodingSupport
    {
       long id;

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-04-29 06:26:47 UTC (rev 6615)
@@ -30,8 +30,6 @@
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
 import org.jboss.messaging.core.buffers.ChannelBuffers;
 import org.jboss.messaging.core.client.management.impl.ManagementHelper;
@@ -100,11 +98,13 @@
 
    private final ManagementService managementService;
 
-   private ScheduledThreadPoolExecutor messageExpiryExecutor;
+   private Thread expiryReaper;
 
-   private final long messageExpiryScanPeriod;
+   private final long reaperPeriod;
 
-   private final int messageExpiryThreadPriority;
+   private final int reaperPriority;
+   
+   private Reaper reaper;
 
    private final ConcurrentMap<SimpleString, DuplicateIDCache> duplicateIDCaches = new ConcurrentHashMap<SimpleString, DuplicateIDCache>();
 
@@ -135,8 +135,8 @@
                          final PagingManager pagingManager,
                          final QueueFactory bindableFactory,
                          final ManagementService managementService,
-                         final long messageExpiryScanPeriod,
-                         final int messageExpiryThreadPriority,
+                         final long reaperPeriod,
+                         final int reaperPriority,
                          final boolean enableWildCardRouting,
                          final boolean backup,
                          final int idCacheSize,
@@ -155,9 +155,9 @@
 
       this.pagingManager = pagingManager;
 
-      this.messageExpiryScanPeriod = messageExpiryScanPeriod;
+      this.reaperPeriod = reaperPeriod;
 
-      this.messageExpiryThreadPriority = messageExpiryThreadPriority;
+      this.reaperPriority = reaperPriority;
 
       if (enableWildCardRouting)
       {
@@ -181,7 +181,7 @@
 
    // MessagingComponent implementation ---------------------------------------
 
-   public void start() throws Exception
+   public synchronized void start() throws Exception
    {
       managementService.addNotificationListener(this);
 
@@ -200,31 +200,16 @@
 
       started = true;
    }
-
-   private void startExpiryScanner()
+     
+   public synchronized void stop() throws Exception
    {
-      if (messageExpiryScanPeriod > 0)
-      {
-         MessageExpiryRunner messageExpiryRunner = new MessageExpiryRunner();
-         messageExpiryExecutor = new ScheduledThreadPoolExecutor(1,
-                                                                 new org.jboss.messaging.utils.JBMThreadFactory("JBM-scheduled-threads",
-                                                                                                                messageExpiryThreadPriority));
-         messageExpiryExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
-         messageExpiryExecutor.scheduleWithFixedDelay(messageExpiryRunner,
-                                                      messageExpiryScanPeriod,
-                                                      messageExpiryScanPeriod,
-                                                      TimeUnit.MILLISECONDS);
-      }
-   }
-
-   public void stop() throws Exception
-   {
       managementService.removeNotificationListener(this);
 
-      if (messageExpiryExecutor != null)
+      if (reaper != null)
       {
-         messageExpiryExecutor.shutdown();
-         messageExpiryExecutor.awaitTermination(60, TimeUnit.SECONDS);
+         reaper.stop();
+         
+         expiryReaper.join();
       }
 
       addressManager.clear();
@@ -820,6 +805,20 @@
 
    // Private -----------------------------------------------------------------
 
+   private synchronized void startExpiryScanner()
+   {
+      if (reaperPeriod > 0)
+      {
+         reaper = new Reaper();
+         
+         expiryReaper = new Thread(reaper, "JBM-expiry-reaper");
+         
+         expiryReaper.setPriority(reaperPriority);
+         
+         expiryReaper.start();
+      }
+   }
+   
    private void routeDirect(final Queue queue, final ServerMessage message) throws Exception
    {
       if (queue.getFilter() == null || queue.getFilter().match(message))
@@ -890,38 +889,75 @@
          return oper;
       }
    }
-
-   private class MessageExpiryRunner implements Runnable
+   
+   private class Reaper implements Runnable
    {
+      private boolean closed;
+      
+      public synchronized void stop()
+      {
+         closed = true;
+         
+         notify();
+      }
+      
       public synchronized void run()
       {
-         Map<SimpleString, Binding> nameMap = addressManager.getBindings();
-
-         List<Queue> queues = new ArrayList<Queue>();
-
-         for (Binding binding : nameMap.values())
+         while (true)
          {
-            if (binding.getType() == BindingType.LOCAL_QUEUE)
+            long toWait = reaperPeriod;
+   
+            long start = System.currentTimeMillis();
+   
+            while (!closed && toWait > 0)
             {
-               Queue queue = (Queue)binding.getBindable();
-
-               queues.add(queue);
+               try
+               {
+                  wait(toWait);
+               }
+               catch (InterruptedException e)
+               {
+               }
+   
+               long now = System.currentTimeMillis();
+   
+               toWait -= now - start;
+   
+               start = now;
             }
-         }
-
-         for (Queue queue : queues)
-         {
-            try
+            
+            if (closed)
             {
-               queue.expireReferences();
+               return;
             }
-            catch (Exception e)
+   
+            Map<SimpleString, Binding> nameMap = addressManager.getBindings();
+   
+            List<Queue> queues = new ArrayList<Queue>();
+   
+            for (Binding binding : nameMap.values())
             {
-               log.error("failed to expire messages for queue " + queue.getName(), e);
+               if (binding.getType() == BindingType.LOCAL_QUEUE)
+               {
+                  Queue queue = (Queue)binding.getBindable();
+   
+                  queues.add(queue);
+               }
             }
+   
+            for (Queue queue : queues)
+            {
+               try
+               {
+                  queue.expireReferences();
+               }
+               catch (Exception e)
+               {
+                  log.error("failed to expire messages for queue " + queue.getName(), e);
+               }
+            }
          }
       }
-
    }
 
    private class PageMessageOperation implements TransactionOperation

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java	2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java	2009-04-29 06:26:47 UTC (rev 6615)
@@ -113,7 +113,9 @@
    private boolean backup;
 
    private volatile boolean started;
-
+   
+   private int replicationCount;
+   
    /*
     * Constructor using static list of connectors
     */
@@ -505,8 +507,6 @@
          waitForReplicationsToComplete(3000);
       }
       
-      private int replicationCount;
-      
       private synchronized void waitForReplicationsToComplete(long timeout)
       {
          long toWait = timeout;

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-04-29 06:26:47 UTC (rev 6615)
@@ -24,6 +24,7 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
 import javax.management.MBeanServer;
@@ -96,7 +97,9 @@
 import org.jboss.messaging.core.transaction.impl.ResourceManagerImpl;
 import org.jboss.messaging.core.transaction.impl.TransactionImpl;
 import org.jboss.messaging.core.version.Version;
+import org.jboss.messaging.utils.ExecutorFactory;
 import org.jboss.messaging.utils.Future;
+import org.jboss.messaging.utils.OrderedExecutorFactory;
 import org.jboss.messaging.utils.Pair;
 import org.jboss.messaging.utils.SimpleString;
 import org.jboss.messaging.utils.UUID;
@@ -143,18 +146,18 @@
 
    private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
 
-   private ScheduledExecutorService scheduledExecutor;
-
    private QueueFactory queueFactory;
 
    private PagingManager pagingManager;
 
    private PostOffice postOffice;
 
-   private ExecutorService asyncDeliveryPool;
+   private ExecutorService threadPool;
 
-   private org.jboss.messaging.utils.ExecutorFactory executorFactory;
+   private ScheduledExecutorService scheduledPool;
 
+   private ExecutorFactory executorFactory;
+
    private HierarchicalRepository<Set<Role>> securityRepository;
 
    private ResourceManager resourceManager;
@@ -270,10 +273,10 @@
       {
          clusterManager.stop();
       }
-      
-      //Need to flush all sessions to make sure all confirmations get sent back to client
-      
-      for (ServerSession session: sessions.values())
+
+      // Need to flush all sessions to make sure all confirmations get sent back to client
+
+      for (ServerSession session : sessions.values())
       {
          session.getChannel().flushConfirmations();
       }
@@ -300,20 +303,6 @@
 
       securityManager.stop();
 
-      asyncDeliveryPool.shutdown();
-
-      try
-      {
-         if (!asyncDeliveryPool.awaitTermination(10000, TimeUnit.MILLISECONDS))
-         {
-            log.warn("Timed out waiting for pool to terminate");
-         }
-      }
-      catch (InterruptedException e)
-      {
-         // Ignore
-      }
-
       if (replicatingConnection != null)
       {
          try
@@ -329,19 +318,33 @@
       }
 
       pagingManager.stop();
+      resourceManager.stop();
+      postOffice.stop();
+
       pagingManager = null;
       securityStore = null;
-      resourceManager.stop();
       resourceManager = null;
-      postOffice.stop();
       postOffice = null;
       securityRepository = null;
       securityStore = null;
-      scheduledExecutor.shutdown();
       queueFactory = null;
       resourceManager = null;
       messagingServerControl = null;
 
+      scheduledPool.shutdown();
+      threadPool.shutdown();
+      try
+      {
+         if (!threadPool.awaitTermination(10000, TimeUnit.MILLISECONDS))
+         {
+            log.warn("Timed out waiting for pool to terminate");
+         }
+      }
+      catch (InterruptedException e)
+      {
+         // Ignore
+      }
+
       sessions.clear();
 
       started = false;
@@ -561,14 +564,14 @@
          this.nodeID = new SimpleString(uuid.toString());
 
          initialisePart2();
-        
+
          long backupID = storageManager.getCurrentUniqueID();
 
          if (liveUniqueID != backupID)
          {
             initialised = false;
-            
-            throw new IllegalStateException("Live and backup unique ids different. You're probably trying to restart a live backup pair after a crash");            
+
+            throw new IllegalStateException("Live and backup unique ids different. You're probably trying to restart a live backup pair after a crash");
          }
 
          log.info("Backup server is now operational");
@@ -588,7 +591,7 @@
                                                                                                     ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
                                                                                                     ClientSessionFactoryImpl.DEFAULT_PING_PERIOD,
                                                                                                     ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL,
-                                                                                                    scheduledExecutor,
+                                                                                                    scheduledPool,
                                                                                                     listener);
 
             if (replicatingConnection == null)
@@ -771,8 +774,7 @@
 
    protected PagingManager createPagingManager()
    {
-      return new PagingManagerImpl(new PagingStoreFactoryNIO(configuration.getPagingDirectory(),
-                                                             configuration.getPagingMaxThreads()),
+      return new PagingManagerImpl(new PagingStoreFactoryNIO(configuration.getPagingDirectory(), executorFactory),
                                    storageManager,
                                    addressSettingsRepository,
                                    configuration.getPagingMaxGlobalSizeBytes(),
@@ -804,9 +806,9 @@
 
             for (Queue queue : toActivate)
             {
-               scheduledExecutor.schedule(new ActivateRunner(queue),
-                                          configuration.getQueueActivationTimeout(),
-                                          TimeUnit.MILLISECONDS);
+               scheduledPool.schedule(new ActivateRunner(queue),
+                                      configuration.getQueueActivationTimeout(),
+                                      TimeUnit.MILLISECONDS);
             }
 
             configuration.setBackup(false);
@@ -865,14 +867,24 @@
 
    private void initialisePart2() throws Exception
    {
-      // Create the pools and executor related objects
-      asyncDeliveryPool = Executors.newCachedThreadPool(new org.jboss.messaging.utils.JBMThreadFactory("JBM-async-session-delivery-threads"));
+      // Create the pools - we have two pools - one for non scheduled - and another for scheduled
 
-      executorFactory = new org.jboss.messaging.utils.OrderedExecutorFactory(asyncDeliveryPool);
+      ThreadFactory tFactory = new org.jboss.messaging.utils.JBMThreadFactory("JBM-threads");
 
-      scheduledExecutor = new ScheduledThreadPoolExecutor(configuration.getScheduledThreadPoolMaxSize(),
-                                                          new org.jboss.messaging.utils.JBMThreadFactory("JBM-scheduled-threads"));
+      if (configuration.getThreadPoolMaxSize() == -1)
+      {
+         threadPool = Executors.newCachedThreadPool(tFactory);
+      }
+      else
+      {
+         threadPool = Executors.newFixedThreadPool(configuration.getThreadPoolMaxSize(), tFactory);
+      }
 
+      executorFactory = new OrderedExecutorFactory(threadPool);
+
+      scheduledPool = new ScheduledThreadPoolExecutor(configuration.getScheduledThreadPoolMaxSize(),
+                                                      new org.jboss.messaging.utils.JBMThreadFactory("JBM-scheduled-threads"));
+
       // Create the hard-wired components
 
       if (configuration.isEnableFileDeployment())
@@ -882,7 +894,7 @@
 
       if (configuration.isEnablePersistence())
       {
-         storageManager = new JournalStorageManager(configuration);
+         storageManager = new JournalStorageManager(configuration, threadPool);
       }
       else
       {
@@ -899,7 +911,7 @@
                                             configuration.getManagementClusterPassword(),
                                             managementService);
 
-      queueFactory = new QueueFactoryImpl(scheduledExecutor, addressSettingsRepository, storageManager);
+      queueFactory = new QueueFactoryImpl(scheduledPool, addressSettingsRepository, storageManager);
 
       pagingManager = createPagingManager();
 
@@ -999,7 +1011,7 @@
          clusterManager = new ClusterManagerImpl(executorFactory,
                                                  this,
                                                  postOffice,
-                                                 scheduledExecutor,
+                                                 scheduledPool,
                                                  managementService,
                                                  configuration,
                                                  uuid,
@@ -1389,7 +1401,7 @@
 
       public void run()
       {
-         queue.activateNow(asyncDeliveryPool);
+         queue.activateNow(threadPool);
       }
    }
 
@@ -1416,13 +1428,13 @@
          if (conn != null)
          {
             // Execute on different thread to avoid deadlocks
-            new Thread()
+            threadPool.execute(new Runnable()
             {
                public void run()
                {
                   conn.fail(me);
                }
-            }.start();
+            });
          }
       }
    }

Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java	2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java	2009-04-29 06:26:47 UTC (rev 6615)
@@ -102,7 +102,7 @@
 
    private ClientBootstrap bootstrap;
 
-   ChannelGroup channelGroup;
+   private ChannelGroup channelGroup;
 
    private final BufferHandler handler;
 

Modified: trunk/src/main/org/jboss/messaging/utils/OrderedExecutorFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/utils/OrderedExecutorFactory.java	2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/src/main/org/jboss/messaging/utils/OrderedExecutorFactory.java	2009-04-29 06:26:47 UTC (rev 6615)
@@ -47,7 +47,7 @@
    {
       this.parent = parent;
    }
-
+   
    /**
     * Get an executor that always executes tasks in order.
     *

Modified: trunk/src/schemas/jbm-configuration.xsd
===================================================================
--- trunk/src/schemas/jbm-configuration.xsd	2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/src/schemas/jbm-configuration.xsd	2009-04-29 06:26:47 UTC (rev 6615)
@@ -28,15 +28,22 @@
 				   minOccurs="0" />	
 				<xsd:element ref="enable-persistence"  maxOccurs="1"
 				   minOccurs="0" />	
-				<xsd:element name="scheduled-max-pool-size"
+				<xsd:element name="scheduled-thread-pool-max-size"
 					type="xsd:int" maxOccurs="1" minOccurs="0">
 					<xsd:annotation>
 						<xsd:documentation>
-							Maximum number of threads to use for
-							scheduled deliveries
+							Maximum number of threads to use for the scheduled thread pool
 						</xsd:documentation>
 					</xsd:annotation>
 				</xsd:element>
+				<xsd:element name="thread-pool-max-size"
+					type="xsd:int" maxOccurs="1" minOccurs="0">
+					<xsd:annotation>
+						<xsd:documentation>
+							Maximum number of threads to use for the thread pool
+						</xsd:documentation>
+					</xsd:annotation>
+				</xsd:element>
 				<xsd:element name="security-enabled" type="xsd:boolean"
 					maxOccurs="1" minOccurs="0">
 				</xsd:element>

Modified: trunk/tests/config/ConfigurationTest-config.xml
===================================================================
--- trunk/tests/config/ConfigurationTest-config.xml	2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/tests/config/ConfigurationTest-config.xml	2009-04-29 06:26:47 UTC (rev 6615)
@@ -2,7 +2,7 @@
    <configuration>
       <clustered>true</clustered>
       <enable-file-deployment>true</enable-file-deployment>
-      <scheduled-max-pool-size>12345</scheduled-max-pool-size>              
+      <scheduled-thread-pool-max-size>12345</scheduled-thread-pool-max-size>              
       <security-enabled>false</security-enabled>
       <security-invalidation-interval>5423</security-invalidation-interval>
       <wild-card-routing-enabled>true</wild-card-routing-enabled>

Modified: trunk/tests/config/ConfigurationTest-full-config.xml
===================================================================
--- trunk/tests/config/ConfigurationTest-full-config.xml	2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/tests/config/ConfigurationTest-full-config.xml	2009-04-29 06:26:47 UTC (rev 6615)
@@ -2,7 +2,8 @@
    <configuration>
       <clustered>true</clustered>
       <enable-file-deployment>true</enable-file-deployment>
-      <scheduled-max-pool-size>12345</scheduled-max-pool-size>             
+      <scheduled-thread-pool-max-size>12345</scheduled-thread-pool-max-size>             
+      <thread-pool-max-size>54321</thread-pool-max-size>
       <security-enabled>false</security-enabled>
       <security-invalidation-interval>5423</security-invalidation-interval>
       <wild-card-routing-enabled>true</wild-card-routing-enabled>

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java	2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java	2009-04-29 06:26:47 UTC (rev 6615)
@@ -1,935 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.tests.integration.client;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.MessageHandler;
-import org.jboss.messaging.core.client.impl.ClientConsumerInternal;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.server.MessagingServer;
-import org.jboss.messaging.tests.util.ServiceTestBase;
-import org.jboss.messaging.utils.SimpleString;
-
-/**
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- */
-public class ConsumerWindowSizeTest extends ServiceTestBase
-{
-   private final SimpleString addressA = new SimpleString("addressA");
-
-   private final SimpleString queueA = new SimpleString("queueA");
-
-   private final int TIMEOUT = 5;
-
-   private static final Logger log = Logger.getLogger(ConsumerWindowSizeTest.class);
-
-   private static final boolean isTrace = log.isTraceEnabled();
-
-   /*
-   * tests send window size. we do this by having 2 receivers on the q. since we roundrobin the consumer for delivery we
-   * know if consumer 1 has received n messages then consumer 2 must have also have received n messages or at least up
-   * to its window size
-   * */
-   public void testSendWindowSize() throws Exception
-   {
-      MessagingServer messagingService = createServer(false);
-      ClientSessionFactory cf = createInVMFactory();
-      try
-      {
-         messagingService.start();
-         cf.setBlockOnNonPersistentSend(false);
-         ClientSession sendSession = cf.createSession(false, true, true);
-         ClientSession receiveSession = cf.createSession(false, true, true);
-         sendSession.createQueue(addressA, queueA, false);
-         ClientConsumer receivingConsumer = receiveSession.createConsumer(queueA);
-         ClientMessage cm = sendSession.createClientMessage(false);
-         cm.setDestination(addressA);
-         int encodeSize = cm.getEncodeSize();
-         int numMessage = 100;
-         cf.setConsumerWindowSize(numMessage * encodeSize);
-         ClientSession session = cf.createSession(false, true, true);
-         ClientProducer cp = sendSession.createProducer(addressA);
-         ClientConsumer cc = session.createConsumer(queueA);
-         session.start();
-         receiveSession.start();
-         for (int i = 0; i < numMessage * 4; i++)
-         {
-            cp.send(sendSession.createClientMessage(false));
-         }
-
-         for (int i = 0; i < numMessage * 2; i++)
-         {
-            ClientMessage m = receivingConsumer.receive(5000);
-            assertNotNull(m);
-            m.acknowledge();
-         }
-         receiveSession.close();
-
-         for (int i = 0; i < numMessage * 2; i++)
-         {
-            ClientMessage m = cc.receive(5000);
-            assertNotNull(m);
-            m.acknowledge();
-         }
-
-         session.close();
-         sendSession.close();
-
-         assertEquals(0, getMessageCount(messagingService, queueA.toString()));
-
-      }
-      finally
-      {
-         if (messagingService.isStarted())
-         {
-            messagingService.stop();
-         }
-      }
-   }
-
-   public void testSlowConsumerBufferingOne() throws Exception
-   {
-      MessagingServer server = createServer(false);
-
-      ClientSession sessionB = null;
-      ClientSession session = null;
-
-      try
-      {
-         final int numberOfMessages = 100;
-
-         server.start();
-
-         ClientSessionFactory sf = createInVMFactory();
-         sf.setConsumerWindowSize(1);
-
-         session = sf.createSession(false, true, true);
-
-         SimpleString ADDRESS = addressA;
-
-         session.createQueue(ADDRESS, ADDRESS, true);
-
-         sessionB = sf.createSession(false, true, true);
-         sessionB.start();
-
-         session.start();
-
-         ClientConsumer consNeverUsed = sessionB.createConsumer(ADDRESS);
-
-         ClientConsumer cons1 = session.createConsumer(ADDRESS);
-
-         ClientProducer prod = session.createProducer(ADDRESS);
-
-         for (int i = 0; i < numberOfMessages; i++)
-         {
-            prod.send(createTextMessage(session, "Msg" + i));
-         }
-
-         for (int i = 0; i < numberOfMessages - 1; i++)
-         {
-            ClientMessage msg = cons1.receive(1000);
-            assertNotNull("expected message at i = " + i, msg);
-            msg.acknowledge();
-         }
-
-         ClientMessage msg = consNeverUsed.receive(500);
-         assertNotNull(msg);
-         msg.acknowledge();
-
-         session.close();
-         session = null;
-
-         sessionB.close();
-         sessionB = null;
-
-         assertEquals(0, getMessageCount(server, ADDRESS.toString()));
-
-      }
-      finally
-      {
-         try
-         {
-            if (session != null)
-            {
-               session.close();
-            }
-            if (sessionB != null)
-            {
-               sessionB.close();
-            }
-         }
-         catch (Exception ignored)
-         {
-         }
-
-         if (server.isStarted())
-         {
-            server.stop();
-         }
-      }
-   }
-
-   public void testSlowConsumerNoBuffer() throws Exception
-   {
-      internalTestSlowConsumerNoBuffer(false);
-   }
-
-   public void testSlowConsumerNoBufferLargeMessages() throws Exception
-   {
-      internalTestSlowConsumerNoBuffer(true);
-   }
-
-   private void internalTestSlowConsumerNoBuffer(final boolean largeMessages) throws Exception
-   {
-      MessagingServer server = createServer(false);
-
-      ClientSession sessionB = null;
-      ClientSession session = null;
-
-      try
-      {
-         final int numberOfMessages = 100;
-
-         server.start();
-
-         ClientSessionFactory sf = createInVMFactory();
-         sf.setConsumerWindowSize(0);
-
-         if (largeMessages)
-         {
-            sf.setMinLargeMessageSize(100);
-         }
-
-         session = sf.createSession(false, true, true);
-
-         SimpleString ADDRESS = addressA;
-
-         session.createQueue(ADDRESS, ADDRESS, true);
-
-         sessionB = sf.createSession(false, true, true);
-         sessionB.start();
-
-         session.start();
-
-         ClientConsumerInternal consNeverUsed = (ClientConsumerInternal)sessionB.createConsumer(ADDRESS);
-
-         ClientConsumer cons1 = session.createConsumer(ADDRESS);
-
-         ClientProducer prod = session.createProducer(ADDRESS);
-
-         for (int i = 0; i < numberOfMessages; i++)
-         {
-            ClientMessage msg = createTextMessage(session, "Msg" + i);
-
-            if (largeMessages)
-            {
-               msg.getBody().writeBytes(new byte[600]);
-            }
-
-            prod.send(msg);
-         }
-
-         for (int i = 0; i < numberOfMessages; i++)
-         {
-            ClientMessage msg = cons1.receive(1000);
-            assertNotNull("expected message at i = " + i, msg);
-            assertEquals("Msg" + i, getTextMessage(msg));
-            msg.acknowledge();
-         }
-
-         assertEquals(0, consNeverUsed.getBufferSize());
-
-         session.close();
-         session = null;
-
-         sessionB.close();
-         sessionB = null;
-
-         assertEquals(0, getMessageCount(server, ADDRESS.toString()));
-
-      }
-      finally
-      {
-         try
-         {
-            if (session != null)
-            {
-               session.close();
-            }
-            if (sessionB != null)
-            {
-               sessionB.close();
-            }
-         }
-         catch (Exception ignored)
-         {
-         }
-
-         if (server.isStarted())
-         {
-            server.stop();
-         }
-      }
-   }
-
-   public void testSlowConsumerNoBuffer2() throws Exception
-   {
-      internalTestSlowConsumerNoBuffer2(false);
-   }
-
-   public void testSlowConsumerNoBuffer2LargeMessages() throws Exception
-   {
-      internalTestSlowConsumerNoBuffer2(true);
-   }
-
-   private void internalTestSlowConsumerNoBuffer2(final boolean largeMessages) throws Exception
-   {
-      MessagingServer server = createServer(false);
-
-      ClientSession session1 = null;
-      ClientSession session2 = null;
-
-      try
-      {
-         final int numberOfMessages = 100;
-
-         server.start();
-
-         ClientSessionFactory sf = createInVMFactory();
-
-         sf.setConsumerWindowSize(0);
-
-         if (largeMessages)
-         {
-            sf.setMinLargeMessageSize(100);
-         }
-
-         session1 = sf.createSession(false, true, true);
-
-         session2 = sf.createSession(false, true, true);
-
-         session1.start();
-
-         session2.start();
-
-         SimpleString ADDRESS = new SimpleString("some-queue");
-
-         session1.createQueue(ADDRESS, ADDRESS, true);
-
-         ClientConsumerInternal cons1 = (ClientConsumerInternal)session1.createConsumer(ADDRESS);
-
-         // Note we make sure we send the messages *before* cons2 is created
-
-         ClientProducer prod = session1.createProducer(ADDRESS);
-
-         for (int i = 0; i < numberOfMessages; i++)
-         {
-            ClientMessage msg = createTextMessage(session1, "Msg" + i);
-            if (largeMessages)
-            {
-               msg.getBody().writeBytes(new byte[600]);
-            }
-            prod.send(msg);
-         }
-
-         ClientConsumerInternal cons2 = (ClientConsumerInternal)session2.createConsumer(ADDRESS);
-
-         for (int i = 0; i < numberOfMessages / 2; i++)
-         {
-            ClientMessage msg = cons1.receive(1000);
-            assertNotNull("expected message at i = " + i, msg);
-
-            String str = getTextMessage(msg);
-            assertEquals("Msg" + i, str);
-
-            msg.acknowledge();
-
-            assertEquals("A slow consumer shouldn't buffer anything on the client side!", 0, cons1.getBufferSize());
-         }
-
-         for (int i = numberOfMessages / 2; i < numberOfMessages; i++)
-         {
-            ClientMessage msg = cons2.receive(1000);
-
-            assertNotNull("expected message at i = " + i, msg);
-
-            assertEquals("Msg" + i, msg.getBody().readString());
-
-            msg.acknowledge();
-
-            assertEquals("A slow consumer shouldn't buffer anything on the client side!", 0, cons2.getBufferSize());
-         }
-
-         session1.close(); // just to make sure everything is flushed and no pending packets on the sending buffer, or
-         // the getMessageCount would fail
-         session2.close();
-
-         session1 = sf.createSession(false, true, true);
-         session1.start();
-         session2 = sf.createSession(false, true, true);
-         session2.start();
-
-         prod = session1.createProducer(ADDRESS);
-
-         assertEquals(0, getMessageCount(server, ADDRESS.toString()));
-
-         // This should also work the other way around
-
-         cons1.close();
-
-         cons2.close();
-
-         cons1 = (ClientConsumerInternal)session1.createConsumer(ADDRESS);
-
-         // Note we make sure we send the messages *before* cons2 is created
-
-         for (int i = 0; i < numberOfMessages; i++)
-         {
-            ClientMessage msg = createTextMessage(session1, "Msg" + i);
-            if (largeMessages)
-            {
-               msg.getBody().writeBytes(new byte[600]);
-            }
-            prod.send(msg);
-         }
-
-         cons2 = (ClientConsumerInternal)session2.createConsumer(ADDRESS);
-
-         // Now we receive on cons2 first
-
-         for (int i = 0; i < numberOfMessages / 2; i++)
-         {
-            ClientMessage msg = cons2.receive(1000);
-            assertNotNull("expected message at i = " + i, msg);
-
-            assertEquals("Msg" + i, msg.getBody().readString());
-
-            msg.acknowledge();
-
-            assertEquals("A slow consumer shouldn't buffer anything on the client side!", 0, cons2.getBufferSize());
-
-         }
-
-         for (int i = numberOfMessages / 2; i < numberOfMessages; i++)
-         {
-            ClientMessage msg = cons1.receive(1000);
-
-            assertNotNull("expected message at i = " + i, msg);
-
-            assertEquals("Msg" + i, msg.getBody().readString());
-
-            msg.acknowledge();
-
-            assertEquals("A slow consumer shouldn't buffer anything on the client side!", 0, cons1.getBufferSize());
-         }
-
-         session1.close();
-         session1 = null;
-         session2.close();
-         session2 = null;
-         assertEquals(0, getMessageCount(server, ADDRESS.toString()));
-
-      }
-      finally
-      {
-         try
-         {
-            if (session1 != null)
-            {
-               session1.close();
-            }
-            if (session2 != null)
-            {
-               session2.close();
-            }
-         }
-         catch (Exception ignored)
-         {
-         }
-
-         if (server.isStarted())
-         {
-            server.stop();
-         }
-      }
-   }
-
-   public void testSlowConsumerOnMessageHandlerNoBuffers() throws Exception
-   {
-      internalTestSlowConsumerOnMessageHandlerNoBuffers(false);
-   }
-
-   public void testSlowConsumerOnMessageHandlerNoBuffersLargeMessage() throws Exception
-   {
-      internalTestSlowConsumerOnMessageHandlerNoBuffers(true);
-   }
-
-   public void internalTestSlowConsumerOnMessageHandlerNoBuffers(final boolean largeMessages) throws Exception
-   {
-
-      MessagingServer server = createServer(false);
-
-      ClientSession sessionB = null;
-      ClientSession session = null;
-
-      try
-      {
-         final int numberOfMessages = 100;
-
-         server.start();
-
-         ClientSessionFactory sf = createInVMFactory();
-         sf.setConsumerWindowSize(0);
-
-         if (largeMessages)
-         {
-            sf.setMinLargeMessageSize(100);
-         }
-
-         session = sf.createSession(false, true, true);
-
-         SimpleString ADDRESS = new SimpleString("some-queue");
-
-         session.createQueue(ADDRESS, ADDRESS, true);
-
-         sessionB = sf.createSession(false, true, true);
-         sessionB.start();
-
-         session.start();
-
-         ClientConsumerInternal consReceiveOneAndHold = (ClientConsumerInternal)sessionB.createConsumer(ADDRESS);
-
-         final CountDownLatch latchReceived = new CountDownLatch(2);
-
-         final CountDownLatch latchDone = new CountDownLatch(1);
-
-         // It can't close the session while the large message is being read
-         final CountDownLatch latchRead = new CountDownLatch(1);
-
-         // It should receive two messages and then give up
-         class LocalHandler implements MessageHandler
-         {
-            boolean failed = false;
-
-            int count = 0;
-
-            /* (non-Javadoc)
-             * @see org.jboss.messaging.core.client.MessageHandler#onMessage(org.jboss.messaging.core.client.ClientMessage)
-             */
-            public synchronized void onMessage(final ClientMessage message)
-            {
-               try
-               {
-                  String str = getTextMessage(message);
-
-                  failed = failed || !str.equals("Msg" + count);
-
-                  message.acknowledge();
-                  latchReceived.countDown();
-
-                  if (count++ == 1)
-                  {
-                     // it will hold here for a while
-                     if (!latchDone.await(TIMEOUT, TimeUnit.SECONDS)) // a timed wait, so if the test fails, one less
-                     // thread around
-                     {
-                        new Exception("ClientConsuemrWindowSizeTest Handler couldn't receive signal in less than 5 seconds").printStackTrace(); 
-                        failed = true;
-                     }
-
-                     if (largeMessages)
-                     {
-                        message.getBody().readBytes(new byte[600]);
-                     }
-
-                     latchRead.countDown();
-                  }
-               }
-               catch (Exception e)
-               {
-                  e.printStackTrace(); // Hudson / JUnit report
-                  failed = true;
-               }
-            }
-         }
-
-         LocalHandler handler = new LocalHandler();
-
-         ClientConsumer cons1 = session.createConsumer(ADDRESS);
-
-         ClientProducer prod = session.createProducer(ADDRESS);
-
-         for (int i = 0; i < numberOfMessages; i++)
-         {
-            ClientMessage msg = createTextMessage(session, "Msg" + i);
-            if (largeMessages)
-            {
-               msg.getBody().writeBytes(new byte[600]);
-            }
-            prod.send(msg);
-         }
-
-         consReceiveOneAndHold.setMessageHandler(handler);
-
-         assertTrue(latchReceived.await(TIMEOUT, TimeUnit.SECONDS));
-
-         assertEquals(0, consReceiveOneAndHold.getBufferSize());
-
-         for (int i = 2; i < numberOfMessages; i++)
-         {
-            ClientMessage msg = cons1.receive(1000);
-            assertNotNull("expected message at i = " + i, msg);
-            assertEquals("Msg" + i, getTextMessage(msg));
-            msg.acknowledge();
-         }
-
-         assertEquals(0, consReceiveOneAndHold.getBufferSize());
-
-         latchDone.countDown();
-
-         // The test can' t close the session while the message is still being read, or it could interrupt the data
-         assertTrue(latchRead.await(10, TimeUnit.SECONDS));
-
-         session.close();
-         session = null;
-
-         sessionB.close();
-         sessionB = null;
-
-         assertEquals(0, getMessageCount(server, ADDRESS.toString()));
-
-         assertFalse("MessageHandler received a failure", handler.failed);
-
-      }
-      finally
-      {
-         try
-         {
-            if (session != null)
-            {
-               session.close();
-            }
-            if (sessionB != null)
-            {
-               sessionB.close();
-            }
-         }
-         catch (Exception ignored)
-         {
-         }
-
-         if (server.isStarted())
-         {
-            server.stop();
-         }
-      }
-   }
-
-   public void testSlowConsumerOnMessageHandlerBufferOne() throws Exception
-   {
-      internalTestSlowConsumerOnMessageHandlerBufferOne(false);
-   }
-
-   private void internalTestSlowConsumerOnMessageHandlerBufferOne(final boolean largeMessage) throws Exception
-   {
-      MessagingServer server = createServer(false);
-
-      ClientSession sessionB = null;
-      ClientSession session = null;
-
-      try
-      {
-         final int numberOfMessages = 100;
-
-         server.start();
-
-         ClientSessionFactory sf = createInVMFactory();
-         sf.setConsumerWindowSize(1);
-
-         if (largeMessage)
-         {
-            sf.setMinLargeMessageSize(100);
-         }
-
-         session = sf.createSession(false, true, true);
-
-         SimpleString ADDRESS = new SimpleString("some-queue");
-
-         session.createQueue(ADDRESS, ADDRESS, true);
-
-         sessionB = sf.createSession(false, true, true);
-         sessionB.start();
-
-         session.start();
-
-         ClientConsumerInternal consReceiveOneAndHold = (ClientConsumerInternal)sessionB.createConsumer(ADDRESS);
-
-         final CountDownLatch latchReceived = new CountDownLatch(2);
-         final CountDownLatch latchReceivedBuffered = new CountDownLatch(3);
-
-         final CountDownLatch latchDone = new CountDownLatch(1);
-
-         // It should receive two messages and then give up
-         class LocalHandler implements MessageHandler
-         {
-            boolean failed = false;
-
-            int count = 0;
-
-            /* (non-Javadoc)
-             * @see org.jboss.messaging.core.client.MessageHandler#onMessage(org.jboss.messaging.core.client.ClientMessage)
-             */
-            public synchronized void onMessage(final ClientMessage message)
-            {
-               try
-               {
-                  String str = getTextMessage(message);
-                  if (isTrace)
-                  {
-                     log.trace("Received message " + str);
-                  }
-
-                  failed = failed || !str.equals("Msg" + count);
-
-                  message.acknowledge();
-                  latchReceived.countDown();
-                  latchReceivedBuffered.countDown();
-
-                  if (count++ == 1)
-                  {
-                     // it will hold here for a while
-                     if (!latchDone.await(TIMEOUT, TimeUnit.SECONDS))
-                     {
-                        new Exception("ClientConsuemrWindowSizeTest Handler couldn't receive signal in less than 5 seconds").printStackTrace();
-                        failed = true;
-                     }
-                  }
-               }
-               catch (Exception e)
-               {
-                  e.printStackTrace(); // Hudson / JUnit report
-                  failed = true;
-               }
-            }
-         }
-
-         LocalHandler handler = new LocalHandler();
-
-         ClientProducer prod = session.createProducer(ADDRESS);
-
-         for (int i = 0; i < numberOfMessages; i++)
-         {
-            ClientMessage msg = createTextMessage(session, "Msg" + i);
-            if (largeMessage)
-            {
-               msg.getBody().writeBytes(new byte[600]);
-            }
-            prod.send(msg);
-         }
-
-         consReceiveOneAndHold.setMessageHandler(handler);
-
-         assertTrue(latchReceived.await(TIMEOUT, TimeUnit.SECONDS));
-
-         long timeout = System.currentTimeMillis() + 1000 * TIMEOUT;
-         while (consReceiveOneAndHold.getBufferSize() == 0 && System.currentTimeMillis() < timeout)
-         {
-            Thread.sleep(10);
-         }
-
-         assertEquals(1, consReceiveOneAndHold.getBufferSize());
-
-         ClientConsumer cons1 = session.createConsumer(ADDRESS);
-
-         for (int i = 3; i < numberOfMessages; i++)
-         {
-            ClientMessage msg = cons1.receive(1000);
-            assertNotNull("expected message at i = " + i, msg);
-            String text = getTextMessage(msg);
-            assertEquals("Msg" + i, text);
-            msg.acknowledge();
-         }
-
-         latchDone.countDown();
-
-         assertTrue(latchReceivedBuffered.await(TIMEOUT, TimeUnit.SECONDS));
-
-         session.close();
-         session = null;
-
-         sessionB.close();
-         sessionB = null;
-
-         assertEquals(0, getMessageCount(server, ADDRESS.toString()));
-
-         assertFalse("MessageHandler received a failure", handler.failed);
-
-      }
-      finally
-      {
-         try
-         {
-            if (session != null)
-            {
-               session.close();
-            }
-            if (sessionB != null)
-            {
-               sessionB.close();
-            }
-         }
-         catch (Exception ignored)
-         {
-            ignored.printStackTrace();
-         }
-
-         if (server.isStarted())
-         {
-            server.stop();
-         }
-      }
-   }
-
-   public void testNoWindowRoundRobin() throws Exception
-   {
-      testNoWindowRoundRobin(false);
-   }
-
-   public void testNoWindowRoundRobinLargeMessage() throws Exception
-   {
-      testNoWindowRoundRobin(true);
-   }
-
-   private void testNoWindowRoundRobin(final boolean largeMessages) throws Exception
-   {
-
-      MessagingServer server = createServer(false);
-
-      ClientSession sessionA = null;
-      ClientSession sessionB = null;
-
-      try
-      {
-         final int numberOfMessages = 100;
-
-         server.start();
-
-         ClientSessionFactory sf = createInVMFactory();
-         sf.setConsumerWindowSize(-1);
-
-         if (largeMessages)
-         {
-            sf.setMinLargeMessageSize(100);
-         }
-
-         sessionA = sf.createSession(false, true, true);
-
-         SimpleString ADDRESS = new SimpleString("some-queue");
-
-         sessionA.createQueue(ADDRESS, ADDRESS, true);
-
-         sessionB = sf.createSession(false, true, true);
-
-         sessionA.start();
-         sessionB.start();
-
-         ClientConsumerInternal consA = (ClientConsumerInternal)sessionA.createConsumer(ADDRESS);
-
-         ClientConsumerInternal consB = (ClientConsumerInternal)sessionB.createConsumer(ADDRESS);
-
-         ClientProducer prod = sessionA.createProducer(ADDRESS);
-
-         for (int i = 0; i < numberOfMessages; i++)
-         {
-            ClientMessage msg = createTextMessage(sessionA, "Msg" + i);
-            if (largeMessages)
-            {
-               msg.getBody().writeBytes(new byte[600]);
-            }
-            prod.send(msg);
-         }
-
-         long timeout = System.currentTimeMillis() + TIMEOUT * 1000;
-
-         boolean foundA = false;
-         boolean foundB = false;
-
-         do
-         {
-            foundA = consA.getBufferSize() == numberOfMessages / 2;
-            foundB = consB.getBufferSize() == numberOfMessages / 2;
-
-            Thread.sleep(10);
-         }
-         while ((!foundA || !foundB) && System.currentTimeMillis() < timeout);
-
-         assertTrue("ConsumerA didn't receive the expected number of messages on buffer (consA=" + consA.getBufferSize() +
-                             ", consB=" +
-                             consB.getBufferSize() +
-                             ") foundA = " +
-                             foundA +
-                             " foundB = " +
-                             foundB,
-                    foundA);
-         assertTrue("ConsumerB didn't receive the expected number of messages on buffer (consA=" + consA.getBufferSize() +
-                             ", consB=" +
-                             consB.getBufferSize() +
-                             ") foundA = " +
-                             foundA +
-                             " foundB = " +
-                             foundB,
-                    foundB);
-
-      }
-      finally
-      {
-         try
-         {
-            if (sessionA != null)
-            {
-               sessionA.close();
-            }
-            if (sessionB != null)
-            {
-               sessionB.close();
-            }
-         }
-         catch (Exception ignored)
-         {
-         }
-
-         if (server.isStarted())
-         {
-            server.stop();
-         }
-      }
-   }
-
-}

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/journal/AIOSequentialFileFactoryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/journal/AIOSequentialFileFactoryTest.java	2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/journal/AIOSequentialFileFactoryTest.java	2009-04-29 06:26:47 UTC (rev 6615)
@@ -25,6 +25,7 @@
 import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/journal/RealAIOJournalImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/journal/RealAIOJournalImplTest.java	2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/journal/RealAIOJournalImplTest.java	2009-04-29 06:26:47 UTC (rev 6615)
@@ -23,6 +23,7 @@
 package org.jboss.messaging.tests.integration.journal;
 
 import java.io.File;
+import java.util.concurrent.Executors;
 
 import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
 import org.jboss.messaging.core.journal.SequentialFileFactory;

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/management/MessagingServerControlTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/management/MessagingServerControlTest.java	2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/management/MessagingServerControlTest.java	2009-04-29 06:26:47 UTC (rev 6615)
@@ -81,6 +81,7 @@
       assertEquals(conf.isBackup(), serverControl.isBackup());
       assertEquals(conf.getQueueActivationTimeout(), serverControl.getQueueActivationTimeout());
       assertEquals(conf.getScheduledThreadPoolMaxSize(), serverControl.getScheduledThreadPoolMaxSize());
+      assertEquals(conf.getThreadPoolMaxSize(), serverControl.getThreadPoolMaxSize());
       assertEquals(conf.getSecurityInvalidationInterval(), serverControl.getSecurityInvalidationInterval());
       assertEquals(conf.isSecurityEnabled(), serverControl.isSecurityEnabled());
       assertEquals(conf.getInterceptorClassNames(), serverControl.getInterceptorClassNames());
@@ -102,8 +103,7 @@
       assertEquals(conf.getJournalMaxAIO(), serverControl.getJournalMaxAIO());
       assertEquals(conf.getJournalBufferReuseSize(), serverControl.getJournalBufferReuseSize());
       assertEquals(conf.isCreateBindingsDir(), serverControl.isCreateBindingsDir());
-      assertEquals(conf.isCreateJournalDir(), serverControl.isCreateJournalDir());
-      assertEquals(conf.getPagingMaxThreads(), serverControl.getPagingMaxThreads());
+      assertEquals(conf.isCreateJournalDir(), serverControl.isCreateJournalDir());      
       assertEquals(conf.getPagingDirectory(), serverControl.getPagingDirectory());
       assertEquals(conf.getPagingMaxGlobalSizeBytes(), serverControl.getPagingMaxGlobalSizeBytes());
       assertEquals(conf.getPagingGlobalWatermarkSize(), serverControl.getPagingGlobalWatermarkSize());

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/management/MessagingServerControlUsingCoreTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/management/MessagingServerControlUsingCoreTest.java	2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/management/MessagingServerControlUsingCoreTest.java	2009-04-29 06:26:47 UTC (rev 6615)
@@ -260,11 +260,6 @@
             return (Long)proxy.retrieveAttributeValue("PagingMaxGlobalSizeBytes");
          }
 
-         public int getPagingMaxThreads()
-         {
-            return (Integer)proxy.retrieveAttributeValue("PagingMaxThreads");
-         }
-
          public long getQueueActivationTimeout()
          {
             return (Long)proxy.retrieveAttributeValue("QueueActivationTimeout");
@@ -274,6 +269,11 @@
          {
             return (Integer)proxy.retrieveAttributeValue("ScheduledThreadPoolMaxSize");
          }
+         
+         public int getThreadPoolMaxSize()
+         {
+            return (Integer)proxy.retrieveAttributeValue("ThreadPoolMaxSize");
+         }
 
          public long getSecurityInvalidationInterval()
          {

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/paging/PageCrashTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/paging/PageCrashTest.java	2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/paging/PageCrashTest.java	2009-04-29 06:26:47 UTC (rev 6615)
@@ -28,6 +28,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
 
 import org.jboss.messaging.core.client.ClientConsumer;
 import org.jboss.messaging.core.client.ClientMessage;
@@ -251,8 +252,7 @@
       @Override
       protected PagingManager createPagingManager()
       {
-         return new PagingManagerImpl(new FailurePagingStoreFactoryNIO(super.getConfiguration().getPagingDirectory(),
-                                                                       super.getConfiguration().getPagingMaxThreads()),
+         return new PagingManagerImpl(new FailurePagingStoreFactoryNIO(super.getConfiguration().getPagingDirectory()),
                                       super.getStorageManager(),
                                       super.getAddressSettingsRepository(),
                                       super.getConfiguration().getPagingMaxGlobalSizeBytes(),
@@ -268,9 +268,9 @@
           * @param directory
           * @param maxThreads
           */
-         public FailurePagingStoreFactoryNIO(final String directory, final int maxThreads)
+         public FailurePagingStoreFactoryNIO(final String directory)
          {
-            super(directory, maxThreads);
+            super(directory, new OrderedExecutorFactory(Executors.newCachedThreadPool()));
          }
 
          // Constants -----------------------------------------------------

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/persistence/JournalStorageManagerIntegrationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/persistence/JournalStorageManagerIntegrationTest.java	2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/persistence/JournalStorageManagerIntegrationTest.java	2009-04-29 06:26:47 UTC (rev 6615)
@@ -22,6 +22,8 @@
 
 package org.jboss.messaging.tests.integration.persistence;
 
+import java.util.concurrent.Executors;
+
 import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.persistence.impl.journal.JournalStorageManager;
 import org.jboss.messaging.core.server.JournalType;
@@ -60,7 +62,7 @@
 
       configuration.setJournalType(JournalType.NIO);
 
-      final JournalStorageManager journal = new JournalStorageManager(configuration);
+      final JournalStorageManager journal = new JournalStorageManager(configuration, Executors.newCachedThreadPool());
       journal.start();
 
       LargeServerMessage msg = journal.createLargeMessage();

Modified: trunk/tests/src/org/jboss/messaging/tests/performance/journal/RealJournalImplAIOTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/journal/RealJournalImplAIOTest.java	2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/journal/RealJournalImplAIOTest.java	2009-04-29 06:26:47 UTC (rev 6615)
@@ -23,6 +23,7 @@
 package org.jboss.messaging.tests.performance.journal;
 
 import java.io.File;
+import java.util.concurrent.Executors;
 
 import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
 import org.jboss.messaging.core.journal.SequentialFileFactory;

Modified: trunk/tests/src/org/jboss/messaging/tests/performance/persistence/StorageManagerTimingTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/persistence/StorageManagerTimingTest.java	2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/persistence/StorageManagerTimingTest.java	2009-04-29 06:26:47 UTC (rev 6615)
@@ -24,6 +24,7 @@
 
 import java.io.File;
 import java.util.HashMap;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
@@ -125,7 +126,7 @@
 
       configuration.setJournalType(journalType);
 
-      final JournalStorageManager journal = new JournalStorageManager(configuration);
+      final JournalStorageManager journal = new JournalStorageManager(configuration, Executors.newCachedThreadPool());
       journal.start();
 
       HashMap<Long, Queue> queues = new HashMap<Long, Queue>();

Modified: trunk/tests/src/org/jboss/messaging/tests/stress/journal/AddAndRemoveStressTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/stress/journal/AddAndRemoveStressTest.java	2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/tests/src/org/jboss/messaging/tests/stress/journal/AddAndRemoveStressTest.java	2009-04-29 06:26:47 UTC (rev 6615)
@@ -24,6 +24,7 @@
 
 import java.io.File;
 import java.util.ArrayList;
+import java.util.concurrent.Executors;
 
 import org.jboss.messaging.core.journal.LoadManager;
 import org.jboss.messaging.core.journal.PreparedTransactionInfo;

Modified: trunk/tests/src/org/jboss/messaging/tests/stress/journal/remote/RemoteJournalAppender.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/stress/journal/remote/RemoteJournalAppender.java	2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/tests/src/org/jboss/messaging/tests/stress/journal/remote/RemoteJournalAppender.java	2009-04-29 06:26:47 UTC (rev 6615)
@@ -23,6 +23,7 @@
 package org.jboss.messaging.tests.stress.journal.remote;
 
 import java.nio.ByteBuffer;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.jboss.messaging.core.journal.LoadManager;

Modified: trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/RealJournalImplAIOTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/RealJournalImplAIOTest.java	2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/RealJournalImplAIOTest.java	2009-04-29 06:26:47 UTC (rev 6615)
@@ -23,6 +23,7 @@
 package org.jboss.messaging.tests.timing.core.journal.impl;
 
 import java.io.File;
+import java.util.concurrent.Executors;
 
 import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
 import org.jboss.messaging.core.journal.SequentialFileFactory;

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/DefaultsFileConfigurationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/DefaultsFileConfigurationTest.java	2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/DefaultsFileConfigurationTest.java	2009-04-29 06:26:47 UTC (rev 6615)
@@ -49,6 +49,8 @@
       assertEquals(ConfigurationImpl.DEFAULT_QUEUE_ACTIVATION_TIMEOUT, conf.getQueueActivationTimeout());
       
       assertEquals(ConfigurationImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, conf.getScheduledThreadPoolMaxSize());
+      
+      assertEquals(ConfigurationImpl.DEFAULT_THREAD_POOL_MAX_SIZE, conf.getThreadPoolMaxSize());
 
       assertEquals(ConfigurationImpl.DEFAULT_SECURITY_INVALIDATION_INTERVAL, conf.getSecurityInvalidationInterval());
 
@@ -115,8 +117,6 @@
 
       assertEquals(ConfigurationImpl.DEFAULT_CREATE_JOURNAL_DIR, conf.isCreateJournalDir());
 
-      assertEquals(ConfigurationImpl.DEFAULT_PAGE_MAX_THREADS, conf.getPagingMaxThreads());
-      
       assertEquals(ConfigurationImpl.DEFAULT_PAGING_DIR, conf.getPagingDirectory());
 
       assertEquals(ConfigurationImpl.DEFAULT_PAGE_MAX_GLOBAL_SIZE, conf.getPagingMaxGlobalSizeBytes());

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java	2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java	2009-04-29 06:26:47 UTC (rev 6615)
@@ -45,7 +45,8 @@
       //Check they match the values from the test file
       assertEquals(true, conf.isClustered());
       assertEquals(true, conf.isEnableFileDeployment());
-      assertEquals(12345, conf.getScheduledThreadPoolMaxSize());      
+      assertEquals(12345, conf.getScheduledThreadPoolMaxSize());
+      assertEquals(54321, conf.getThreadPoolMaxSize());      
       assertEquals(false, conf.isSecurityEnabled());
       assertEquals(5423, conf.getSecurityInvalidationInterval());
       assertEquals(true, conf.isWildcardRoutingEnabled());
@@ -67,8 +68,7 @@
       assertEquals(true, conf.isPersistIDCache());
       assertEquals(12456, conf.getQueueActivationTimeout());
       assertEquals(true, conf.isBackup());
-      assertEquals(true, conf.isPersistDeliveryCountBeforeDelivery());
-      assertEquals(2, conf.getPagingMaxThreads());
+      assertEquals(true, conf.isPersistDeliveryCountBeforeDelivery());      
       assertEquals("pagingdir", conf.getPagingDirectory());
       assertEquals(123, conf.getPagingGlobalWatermarkSize());
       assertEquals(4567, conf.getPagingMaxGlobalSizeBytes());

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/deployers/impl/QueueDeployerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/deployers/impl/QueueDeployerTest.java	2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/deployers/impl/QueueDeployerTest.java	2009-04-29 06:26:47 UTC (rev 6615)
@@ -147,6 +147,12 @@
    private class FakeServerControl implements MessagingServerControlMBean
    {
 
+      public int getThreadPoolMaxSize()
+      {
+         // TODO Auto-generated method stub
+         return 0;
+      }
+
       public boolean closeConnectionsForAddress(String ipAddress) throws Exception
       {
          // TODO Auto-generated method stub
@@ -360,12 +366,6 @@
          return 0;
       }
 
-      public int getPagingMaxThreads()
-      {
-         // TODO Auto-generated method stub
-         return 0;
-      }
-
       public long getQueueActivationTimeout()
       {
          // TODO Auto-generated method stub

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AIOSequentialFileTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AIOSequentialFileTest.java	2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AIOSequentialFileTest.java	2009-04-29 06:26:47 UTC (rev 6615)
@@ -24,6 +24,7 @@
 
 import java.io.File;
 import java.nio.ByteBuffer;
+import java.util.concurrent.Executors;
 
 import org.easymock.EasyMock;
 import org.easymock.IAnswer;

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingManagerITest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingManagerITest.java	2009-04-29 06:12:41 UTC (rev 6614)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingManagerITest.java	2009-04-29 06:26:47 UTC (rev 6615)
@@ -25,6 +25,7 @@
 import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.concurrent.Executors;
 
 import org.jboss.messaging.core.buffers.ChannelBuffers;
 import org.jboss.messaging.core.paging.Page;
@@ -41,6 +42,7 @@
 import org.jboss.messaging.core.settings.impl.HierarchicalObjectRepository;
 import org.jboss.messaging.tests.util.RandomUtil;
 import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.utils.OrderedExecutorFactory;
 import org.jboss.messaging.utils.SimpleString;
 
 /**
@@ -67,7 +69,7 @@
       HierarchicalRepository<AddressSettings> addressSettings = new HierarchicalObjectRepository<AddressSettings>();
       addressSettings.setDefault(new AddressSettings());
       
-      PagingManagerImpl managerImpl = new PagingManagerImpl(new PagingStoreFactoryNIO(getPageDir(), 10),
+      PagingManagerImpl managerImpl = new PagingManagerImpl(new PagingStoreFactoryNIO(getPageDir(), new OrderedExecutorFactory(Executors.newCachedThreadPool())),
                                                             new NullStorageManager(),
                                                             addressSettings,
                                                             -1,




More information about the jboss-cvs-commits mailing list