[jboss-cvs] JBoss Messaging SVN: r4938 - in trunk: src/main/org/jboss/messaging/core/config and 24 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Sep 12 23:51:33 EDT 2008


Author: clebert.suconic at jboss.com
Date: 2008-09-12 23:51:32 -0400 (Fri, 12 Sep 2008)
New Revision: 4938

Added:
   trunk/tests/src/org/jboss/messaging/tests/integration/base/
   trunk/tests/src/org/jboss/messaging/tests/integration/base/IntegrationTestBase.java
   trunk/tests/src/org/jboss/messaging/tests/stress/paging/
   trunk/tests/src/org/jboss/messaging/tests/stress/paging/MultipleDestinationPagingTest.java
Modified:
   trunk/src/config/jbm-configuration.xml
   trunk/src/main/org/jboss/messaging/core/config/Configuration.java
   trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
   trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
   trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
   trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java
   trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java
   trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java
   trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionQueueQueryResponseMessage.java
   trunk/src/main/org/jboss/messaging/core/server/Queue.java
   trunk/src/main/org/jboss/messaging/core/server/QueueFactory.java
   trunk/src/main/org/jboss/messaging/core/server/ServerMessage.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/jboss/messaging/jms/server/management/JMSQueueControlMBean.java
   trunk/src/main/org/jboss/messaging/jms/server/management/SubscriptionInfo.java
   trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSQueueControl.java
   trunk/src/main/org/jboss/messaging/jms/server/management/impl/TopicControl.java
   trunk/src/main/org/jboss/messaging/util/TypedProperties.java
   trunk/src/main/org/jboss/messaging/util/VariableLatch.java
   trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
   trunk/tests/src/org/jboss/messaging/tests/timing/core/server/impl/QueueImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/transaction/impl/TransactionImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossSessionTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/jms/server/management/SubscriptionInfoTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/jms/server/management/impl/TopicControlTest.java
Log:
JBMESSAGING-1314 - Completing implementation on paging

Modified: trunk/src/config/jbm-configuration.xml
===================================================================
--- trunk/src/config/jbm-configuration.xml	2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/config/jbm-configuration.xml	2008-09-13 03:51:32 UTC (rev 4938)
@@ -94,6 +94,12 @@
          </acceptor>
       </remoting-acceptors>
       
+      <!--  Paging configuration -->
+      
+      <paging-directory>data/paging</paging-directory>
+      
+      <max-global-size-bytes>104857600</max-global-size-bytes>
+      
       <!-- Storage configuration -->
 
       <bindings-directory>data/bindings</bindings-directory>

Modified: trunk/src/main/org/jboss/messaging/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/Configuration.java	2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/core/config/Configuration.java	2008-09-13 03:51:32 UTC (rev 4938)
@@ -140,5 +140,9 @@
    boolean isCreateJournalDir();
    
    void setCreateJournalDir(boolean create);
+   
+   long getMaxGlobalSizeBytes();
+   
+   void setMaxGlobalSizeBytes(long maxGlobalSize);
 
 }

Modified: trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2008-09-13 03:51:32 UTC (rev 4938)
@@ -114,7 +114,12 @@
          
    protected TransportConfiguration backupConnectorConfig;
    
+   // Paging related attributes
+   
+   protected long maxGlobalSize = -1;
          
+   protected String pagingDirectory = DEFAULT_PAGING_DIR;
+
    // Journal related attributes
    
    protected String bindingsDirectory = DEFAULT_BINDINGS_DIRECTORY;
@@ -123,8 +128,6 @@
    
    protected String journalDirectory = DEFAULT_JOURNAL_DIR;
    
-   protected String pagingDirectory = DEFAULT_PAGING_DIR;
-   
    protected boolean createJournalDir = DEFAULT_CREATE_JOURNAL_DIR;
    
    public JournalType journalType = DEFAULT_JOURNAL_TYPE;
@@ -393,6 +396,18 @@
       return this.journalBufferReuseSize;
    }	
 	
+   
+   public long getMaxGlobalSizeBytes()
+   {
+      return this.maxGlobalSize;
+   }
+   
+   public void setMaxGlobalSizeBytes(long maxGlobalSize)
+   {
+      this.maxGlobalSize = maxGlobalSize;      
+   }
+
+   
    public boolean equals(Object other)
    {
       if (this == other)

Modified: trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java	2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java	2008-09-13 03:51:32 UTC (rev 4938)
@@ -313,6 +313,8 @@
       journalDirectory = getString(e, "journal-directory", journalDirectory);
       
       pagingDirectory = getString(e, "paging-directory", pagingDirectory);
+      
+      maxGlobalSize = getLong(e, "max-global-size-bytes", maxGlobalSize);
 
       createJournalDir = getBoolean(e, "create-journal-dir", createJournalDir);
 

Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java	2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java	2008-09-13 03:51:32 UTC (rev 4938)
@@ -40,8 +40,11 @@
 public interface PagingManager extends MessagingComponent
 {
 
+   
+   boolean isGlobalPageMode();
+   
    /** To return the PageStore associated with the address */
-   public PagingStore getPageStore(SimpleString address) throws Exception;
+   PagingStore getPageStore(SimpleString address) throws Exception;
    
    /** An injection point for the PostOffice to inject itself */
    void setPostOffice(PostOffice postOffice);

Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java	2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java	2008-09-13 03:51:32 UTC (rev 4938)
@@ -84,7 +84,7 @@
     * @return false if a thread was already started, or if not in page mode
     * @throws Exception 
     */
-   boolean startDepaging() throws Exception;
+   boolean startDepaging();
 
    LastPageRecord getLastRecord();
 

Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java	2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java	2008-09-13 03:51:32 UTC (rev 4938)
@@ -23,6 +23,8 @@
 
 package org.jboss.messaging.core.paging;
 
+import java.util.concurrent.Executor;
+
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 
 
@@ -38,6 +40,8 @@
 
    PagingStore newStore(org.jboss.messaging.util.SimpleString destinationName, QueueSettings queueSettings);
    
+   Executor getPagingExecutor();
+   
    void setPagingManager(PagingManager manager);
    
 }

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java	2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java	2008-09-13 03:51:32 UTC (rev 4938)
@@ -73,6 +73,11 @@
    
    // Public --------------------------------------------------------
 
+   public Executor getPagingExecutor()
+   {
+      return this.executor;
+   }
+   
    public PagingStore newStore(final SimpleString destinationName, QueueSettings settings)
    {
       final String destinationDirectory = directory + "/" + destinationName.toString();

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2008-09-13 03:51:32 UTC (rev 4938)
@@ -29,6 +29,8 @@
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.paging.LastPageRecord;
@@ -57,11 +59,21 @@
 
    
    // Constants -----------------------------------------------------
+   private static final long WATERMARK_GLOBAL_PAGE = QueueSettings.DEFAULT_PAGE_SIZE_BYTES;
    
    // Attributes ----------------------------------------------------
    
+   
    private volatile boolean started = false;
    
+   private final long maxGlobalSize;
+   
+   private final AtomicLong globalSize = new AtomicLong(0);
+   
+   private final AtomicBoolean globalMode = new AtomicBoolean(false);
+   
+   private final AtomicBoolean globalDepageRunning = new AtomicBoolean(false);
+   
    private final ConcurrentMap<SimpleString, PagingStore> stores = new ConcurrentHashMap<SimpleString, PagingStore>();
    
    private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
@@ -95,17 +107,25 @@
    // Constructors --------------------------------------------------------------------------------------------------------------------
    
    public PagingManagerImpl(final PagingStoreFactory pagingSPI, StorageManager storageManager, 
-                            final HierarchicalRepository<QueueSettings> queueSettingsRepository)
+                            final HierarchicalRepository<QueueSettings> queueSettingsRepository,
+                            final long maxGlobalSize)
    {
       this.pagingSPI = pagingSPI;
       this.queueSettingsRepository = queueSettingsRepository;
       this.storageManager = storageManager;
+      this.maxGlobalSize = maxGlobalSize;
    }
    
    // Public ---------------------------------------------------------------------------------------------------------------------------
    
    // PagingManager implementation -----------------------------------------------------------------------------------------------------
    
+   
+   public boolean isGlobalPageMode()
+   {
+      return globalMode.get();
+   }
+   
    public PagingStore getPageStore(final SimpleString storeName) throws Exception
    {
       validateStarted();
@@ -246,7 +266,18 @@
          ref.getQueue().addLast(ref);
       }
       
-      return pagingStore.getAddressSize() < pagingStore.getMaxSizeBytes(); 
+      
+      if (globalMode.get())
+      {
+         return globalSize.get() < maxGlobalSize -  WATERMARK_GLOBAL_PAGE &&
+                pagingStore.getMaxSizeBytes() <= 0 || pagingStore.getAddressSize() < pagingStore.getMaxSizeBytes();
+      }
+      else
+      {
+         // If Max-size is not configured (-1) it will aways return true, as this method was probably called by global-depage
+         return pagingStore.getMaxSizeBytes() <= 0 || pagingStore.getAddressSize() < pagingStore.getMaxSizeBytes();
+      }
+
    }
    
    public void setLastPage(LastPageRecord lastPage) throws Exception
@@ -262,12 +293,12 @@
    
    public void messageDone(ServerMessage message) throws Exception
    {
-      addSize(message.getDestination(), message.getEncodeSize() * -1);
+      addSize(message.getDestination(), message.getMemoryEstimate() * -1);
    }
    
    public long addSize(final ServerMessage message) throws Exception
    {
-      return addSize(message.getDestination(), message.getEncodeSize());      
+      return addSize(message.getDestination(), message.getMemoryEstimate());      
    }
    
    public boolean page(ServerMessage message, long transactionId)
@@ -358,9 +389,11 @@
       
       final long pageSize = store.getPageSizeBytes();
 
+      
       if (store.isDropWhenMaxSize() && size > 0)
       {
-         if (store.getAddressSize() + size > maxSize)
+         // if destination configured to drop messages && size is over the limit, we return -1 what means drop the message
+         if ((store.getAddressSize() + size > maxSize) || (maxGlobalSize > 0 && (globalSize.get() + size > maxGlobalSize)))
          {
             if (!store.isDroppedMessage())
             {
@@ -377,12 +410,16 @@
       }
       else
       {
+         
+         long currentGlobalSize = globalSize.addAndGet(size);
+         
          final long addressSize = store.addAddressSize(size);
 
          if (size > 0)
          {
-            if (maxSize > 0 && addressSize > maxSize)
+            if ((maxGlobalSize > 0 && (currentGlobalSize > maxGlobalSize)))
             {
+               globalMode.set(true);
                if (store.startPaging())
                {
                   if (isTrace)
@@ -391,9 +428,26 @@
                   }
                }
             }
+            else
+            if ((maxSize > 0 && (addressSize > maxSize)))
+            {
+               if (store.startPaging())
+               {
+                  if (isTrace)
+                  {
+                     trace("Starting paging on " + destination + ", size = " + addressSize + ", maxSize=" + maxSize);
+                  }
+               }
+            }
          }
          else
          {
+            // When in Global mode, we use the default page size as the minimal watermark to start depage
+            if (globalMode.get() && currentGlobalSize < maxGlobalSize - QueueSettings.DEFAULT_PAGE_SIZE_BYTES)
+            {
+               startGlobalDepage();
+            }
+            else
             if ( maxSize > 0 && addressSize < (maxSize - pageSize))
             {
                if (store.startDepaging())
@@ -407,7 +461,71 @@
       }
    }
 
+
+   private void startGlobalDepage()
+   {
+      if (globalDepageRunning.compareAndSet(false, true))
+      {
+         Runnable globalDepageRunnable = new GlobalDepager();
+         pagingSPI.getPagingExecutor().execute(globalDepageRunnable);
+      }
+   }
+
    
    // Inner classes -------------------------------------------------
    
+   
+   class GlobalDepager implements Runnable
+   {
+      public void run()
+      {
+         try
+         {
+            while (globalSize.get() < maxGlobalSize)
+            {
+               boolean depaged = false;
+               // Round robin depaging one page at the time from each destination
+               for (PagingStore store : stores.values())
+               {
+                  if (globalSize.get() < maxGlobalSize)
+                  {
+                     if (store.isPaging())
+                     {
+                        depaged = true;
+                        try
+                        {
+                           store.readPage();
+                        }
+                        catch (Exception e)
+                        {
+                           log.error(e.getMessage(), e);
+                        }
+                     }
+                  }
+               }
+               if (!depaged)
+               {
+                  break;
+               }
+            }
+            
+            if (globalSize.get() < maxGlobalSize)
+            {
+               
+               globalMode.set(false);
+               // Clearing possible messages still in page-mode
+               for (PagingStore store : stores.values())
+               {
+                  store.startDepaging();
+               }
+            }
+         }
+         finally
+         {
+            PagingManagerImpl.this.globalDepageRunning.set(false);
+         }
+      }
+      
+   }
+   
 }

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2008-09-13 03:51:32 UTC (rev 4938)
@@ -44,6 +44,8 @@
 
 /**
  * 
+ * @see PagingStore
+ * 
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
  */
@@ -197,10 +199,10 @@
       }
       page.open();
       PageMessage messages[] = page.read();
-      boolean addressFull = pagingManager.onDepage(page.getPageId(), PagingStoreImpl.this.storeName, PagingStoreImpl.this, messages);
+      boolean addressNotFull = pagingManager.onDepage(page.getPageId(), PagingStoreImpl.this.storeName, PagingStoreImpl.this, messages);
       page.delete();
       
-      return addressFull;
+      return addressNotFull;
 
    }
    
@@ -363,7 +365,7 @@
       }
    }
    
-   public boolean startDepaging() throws Exception
+   public boolean startDepaging()
    {
       lock.readLock().lock();
       try

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2008-09-13 03:51:32 UTC (rev 4938)
@@ -110,6 +110,9 @@
          pagingManager.start();
       }
       
+      // Injecting the postoffice (itself) on queueFactory for paging-control
+      queueFactory.setPostOffice(this);
+      
       loadBindings();
       
       started = true;
@@ -439,8 +442,11 @@
       
       for (SimpleString destination: dests)
       {
-         PagingStore store = pagingManager.getPageStore(destination);
-         store.startDepaging();
+         if (!pagingManager.isGlobalPageMode())
+         {
+            PagingStore store = pagingManager.getPageStore(destination);
+            store.startDepaging();
+         }
       }
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionQueueQueryResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionQueueQueryResponseMessage.java	2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionQueueQueryResponseMessage.java	2008-09-13 03:51:32 UTC (rev 4938)
@@ -38,8 +38,6 @@
    
    private boolean durable;
    
-   private int maxSize;
-   
    private int consumerCount;
    
    private int messageCount;
@@ -48,19 +46,19 @@
    
    private SimpleString address;
    
-   public SessionQueueQueryResponseMessage(final boolean durable, final int maxSize, 
+   public SessionQueueQueryResponseMessage(final boolean durable, 
    		final int consumerCount, final int messageCount, final SimpleString filterString,
    		final SimpleString address)
    {
-   	this(durable, maxSize, consumerCount, messageCount, filterString, address, true);
+   	this(durable, consumerCount, messageCount, filterString, address, true);
    }
    
    public SessionQueueQueryResponseMessage()
    {
-      this(false, 0, 0, 0, null, null, false);
+      this(false, 0, 0, null, null, false);
    }
    
-   private SessionQueueQueryResponseMessage(final boolean durable, final int maxSize, 
+   private SessionQueueQueryResponseMessage(final boolean durable, 
    		final int consumerCount, final int messageCount, final SimpleString filterString, final SimpleString address,
    		final boolean exists)
    {
@@ -68,8 +66,6 @@
        
       this.durable = durable;
       
-      this.maxSize = maxSize;
-      
       this.consumerCount = consumerCount;
       
       this.messageCount = messageCount;
@@ -96,11 +92,6 @@
       return durable;
    }
    
-   public int getMaxSize()
-   {
-      return maxSize;
-   }
-   
    public int getConsumerCount()
    {
       return consumerCount;
@@ -125,7 +116,6 @@
    {
       buffer.putBoolean(exists);
       buffer.putBoolean(durable);
-      buffer.putInt(maxSize);
       buffer.putInt(consumerCount);
       buffer.putInt(messageCount);
       buffer.putNullableSimpleString(filterString);
@@ -136,7 +126,6 @@
    {
       exists = buffer.getBoolean();
       durable = buffer.getBoolean();
-      maxSize = buffer.getInt();
       consumerCount = buffer.getInt();
       messageCount = buffer.getInt();
       filterString  = buffer.getNullableSimpleString();
@@ -154,7 +143,6 @@
       
       return super.equals(other) && this.exists == r.exists &&
                this.durable == r.durable &&             
-               this.maxSize == r.maxSize &&
                this.consumerCount == r.consumerCount &&
                this.messageCount == r.messageCount &&
                this.filterString == null ? r.filterString == null : this.filterString.equals(r.filterString) &&

Modified: trunk/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Queue.java	2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/core/server/Queue.java	2008-09-13 03:51:32 UTC (rev 4938)
@@ -51,8 +51,6 @@
    
    HandleStatus addFirst(MessageReference ref);
    
-   QueueSettings getSettings();
-   
    /**
     * This method is used to add a List of MessageReferences atomically at the head of the list.
     * Useful when cancelling messages and guaranteeing ordering

Modified: trunk/src/main/org/jboss/messaging/core/server/QueueFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/QueueFactory.java	2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/core/server/QueueFactory.java	2008-09-13 03:51:32 UTC (rev 4938)
@@ -23,6 +23,7 @@
 package org.jboss.messaging.core.server;
 
 import org.jboss.messaging.core.filter.Filter;
+import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.util.SimpleString;
 
 /**
@@ -39,4 +40,10 @@
 {
    Queue createQueue(long persistenceID, SimpleString name, Filter filter,
                      boolean durable);
+   
+   /**
+    * This is required for delete-all-reference to work correctly with paging
+    * @param postOffice
+    */
+   void setPostOffice(PostOffice postOffice);
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/ServerMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerMessage.java	2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerMessage.java	2008-09-13 03:51:32 UTC (rev 4938)
@@ -52,5 +52,7 @@
    int getRefCount();
    
    ServerMessage copy();
+   
+   int getMemoryEstimate();
 }
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java	2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java	2008-09-13 03:51:32 UTC (rev 4938)
@@ -49,8 +49,6 @@
    private static final Logger log = Logger.getLogger(MessageReferenceImpl.class);
    
    // Attributes ----------------------------------------------------
-
-   private boolean trace = log.isTraceEnabled();
    
    private volatile int deliveryCount;   
    

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-09-13 03:51:32 UTC (rev 4938)
@@ -193,7 +193,7 @@
 
       PagingStoreFactory storeFactory = new PagingManagerFactoryNIO(configuration.getPagingDirectory());
 
-      pagingManager = new PagingManagerImpl(storeFactory, storageManager, queueSettingsRepository);
+      pagingManager = new PagingManagerImpl(storeFactory, storageManager, queueSettingsRepository, configuration.getMaxGlobalSizeBytes());
       
       storeFactory.setPagingManager(pagingManager);
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java	2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java	2008-09-13 03:51:32 UTC (rev 4938)
@@ -25,6 +25,7 @@
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.jboss.messaging.core.filter.Filter;
+import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.QueueFactory;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
@@ -45,6 +46,9 @@
 
    private final ScheduledExecutorService scheduledExecutor;
    
+   /** This is required for delete-all-reference to work correctly with paging, and controlling global-size */
+   private PostOffice postOffice;
+   
    public QueueFactoryImpl(final ScheduledExecutorService scheduledExecutor,
    	                   	final HierarchicalRepository<QueueSettings> queueSettingsRepository)
    {
@@ -53,12 +57,17 @@
       this.scheduledExecutor = scheduledExecutor;
    }
    
+   public void setPostOffice(PostOffice postOffice)
+   {
+      this.postOffice = postOffice;
+   }
+   
    public Queue createQueue(final long persistenceID, final SimpleString name, final Filter filter,
                             final boolean durable)
    {
       QueueSettings queueSettings = queueSettingsRepository.getMatch(name.toString());
             
-      Queue queue = new QueueImpl(persistenceID, name, filter, queueSettings.isClustered(), durable, queueSettings, scheduledExecutor);
+      Queue queue = new QueueImpl(persistenceID, name, filter, queueSettings.isClustered(), durable, scheduledExecutor, postOffice);
 
       queue.setDistributionPolicy(queueSettings.getDistributionPolicy());
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-09-13 03:51:32 UTC (rev 4938)
@@ -88,6 +88,8 @@
    private final boolean durable;
 
    private final ScheduledExecutorService scheduledExecutor;
+   
+   private final PostOffice postOffice;
 
    private final PriorityLinkedList<MessageReference> messageReferences =
       new PriorityLinkedListImpl<MessageReference>(NUM_PRIORITIES);
@@ -118,14 +120,12 @@
    
    private final Lock lock = new ReentrantLock(false);
    
-   private final QueueSettings settings;
-   
    private volatile boolean backup;
          
    public QueueImpl(final long persistenceID, final SimpleString name,
          final Filter filter, final boolean clustered, final boolean durable,
-         final QueueSettings settings,
-         final ScheduledExecutorService scheduledExecutor)
+         final ScheduledExecutorService scheduledExecutor,
+         final PostOffice postOffice)
    {
       this.persistenceID = persistenceID;
 
@@ -139,7 +139,7 @@
 
       this.scheduledExecutor = scheduledExecutor;
       
-      this.settings = settings;
+      this.postOffice = postOffice;
 
       direct = true;
    }
@@ -148,11 +148,6 @@
    // -------------------------------------------------------------------
 
 
-   public QueueSettings getSettings()
-   {
-      return this.settings;
-   }
-
    public boolean isClustered()
    {
       return clustered;
@@ -472,7 +467,7 @@
 
    public synchronized void deleteAllReferences(final StorageManager storageManager) throws Exception
    {
-      Transaction tx = new TransactionImpl(storageManager, null);
+      Transaction tx = new TransactionImpl(storageManager, postOffice);
 
       Iterator<MessageReference> iter = messageReferences.iterator();
 
@@ -508,7 +503,7 @@
    {
       boolean deleted = false;
       
-      Transaction tx = new TransactionImpl(storageManager, null);
+      Transaction tx = new TransactionImpl(storageManager, postOffice);
 
       Iterator<MessageReference> iter = messageReferences.iterator();
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java	2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java	2008-09-13 03:51:32 UTC (rev 4938)
@@ -137,7 +137,17 @@
    {
       return refCount.get();
    }
+   
+   public int getMemoryEstimate()
+   {
+      // This is just an estimate...
+      // due to memory alignments and JVM implementation this could be very different from reality
+      return getEncodeSize() +
+             (16 + 4) * 2 +// Each AtomicInteger consumes 16 bytes for the Object and ObjectReference + 4 bytes for the internal integer
+             8; // MessageID
 
+   }
+
    public ServerMessage copy()
    {
       return new ServerMessageImpl(this);

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-09-13 03:51:32 UTC (rev 4938)
@@ -1096,11 +1096,8 @@
 
          SimpleString filterString = filter == null ? null : filter
                   .getFilterString();
-
-         QueueSettings settings = queue.getSettings();
-
          // TODO: Remove MAX-SIZE-BYTES from SessionQueueQueryResponse.
-         response = new SessionQueueQueryResponseMessage(queue.isDurable(), settings.getMaxSizeBytes(),
+         response = new SessionQueueQueryResponseMessage(queue.isDurable(), 
                  queue.getConsumerCount(), queue.getMessageCount(),
                  filterString, binding.getAddress());
       }

Modified: trunk/src/main/org/jboss/messaging/jms/server/management/JMSQueueControlMBean.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/JMSQueueControlMBean.java	2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/JMSQueueControlMBean.java	2008-09-13 03:51:32 UTC (rev 4938)
@@ -57,8 +57,6 @@
 
    long getSizeBytes();
 
-   int getMaxSizeBytes();
-
    int getMessageCount();
 
    long getScheduledCount();

Modified: trunk/src/main/org/jboss/messaging/jms/server/management/SubscriptionInfo.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/SubscriptionInfo.java	2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/SubscriptionInfo.java	2008-09-13 03:51:32 UTC (rev 4938)
@@ -50,13 +50,13 @@
    private static final String SUBSCRIPTION_TYPE_NAME = "SubscriptionInfo";
    private static final String SUBSCRIPTION_TABULAR_TYPE_NAME = "SubscriptionTabularInfo";
    private static final String[] ITEM_NAMES = new String[] { "queueName", "clientID",
-         "name", "durable", "selector", "messageCount", "maxSizeBytes" };
+         "name", "durable", "selector", "messageCount" };
    private static final String[] ITEM_DESCRIPTIONS = new String[] {
          "ID of the subscription", "ClientID of the subscription",
          "name of the subscription", "Is the subscriber durable?", "Selector",
-         "Number of messages", "Maximum size in bytes" };
+         "Number of messages" };
    private static final OpenType[] ITEM_TYPES = new OpenType[] { STRING,
-         STRING, STRING, BOOLEAN, STRING, INTEGER, INTEGER };
+         STRING, STRING, BOOLEAN, STRING, INTEGER};
 
    static
    {
@@ -78,7 +78,6 @@
    private final boolean durable;
    private final String selector;
    private final int messageCount;
-   private final int maxSizeBytes;
 
    // Static --------------------------------------------------------
 
@@ -111,7 +110,7 @@
 
    public SubscriptionInfo(final String queueName, final String clientID,
          final String name, final boolean durable, final String selector,
-         final int messageCount, final int maxSizeBytes)
+         final int messageCount)
    {
       this.queueName = queueName;
       this.clientID = clientID;
@@ -119,7 +118,6 @@
       this.durable = durable;
       this.selector = selector;
       this.messageCount = messageCount;
-      this.maxSizeBytes = maxSizeBytes;
    }
 
    // Public --------------------------------------------------------
@@ -154,17 +152,12 @@
       return messageCount;
    }
 
-   public int getMaxSizeBytes()
-   {
-      return maxSizeBytes;
-   }
-
    public CompositeData toCompositeData()
    {
       try
       {
          return new CompositeDataSupport(TYPE, ITEM_NAMES, new Object[] { queueName,
-               clientID, name, durable, selector, messageCount, maxSizeBytes });
+               clientID, name, durable, selector, messageCount});
       } catch (OpenDataException e)
       {
          return null;

Modified: trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSQueueControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSQueueControl.java	2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSQueueControl.java	2008-09-13 03:51:32 UTC (rev 4938)
@@ -149,11 +149,6 @@
       return coreQueue.getDeliveringCount();
    }
 
-   public int getMaxSizeBytes()
-   {
-      return coreQueue.getSettings().getMaxSizeBytes();
-   }
-
    public long getScheduledCount()
    {
       return coreQueue.getScheduledCount();

Modified: trunk/src/main/org/jboss/messaging/jms/server/management/impl/TopicControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/impl/TopicControl.java	2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/impl/TopicControl.java	2008-09-13 03:51:32 UTC (rev 4938)
@@ -245,14 +245,12 @@
             clientID = pair.a;
             subName = pair.b;
          }
-
-         QueueSettings queueSettings = queue.getSettings();
          
          String filter = queue.getFilter() != null ? queue.getFilter()
                .getFilterString().toString() : null;
          SubscriptionInfo info = new SubscriptionInfo(queue.getName().toString(),
                clientID, subName, queue.isDurable(), filter, queue
-                     .getMessageCount(), queueSettings.getMaxSizeBytes());
+                     .getMessageCount());
          subInfos.add(info);
       }
       return (SubscriptionInfo[]) subInfos.toArray(new SubscriptionInfo[subInfos

Modified: trunk/src/main/org/jboss/messaging/util/TypedProperties.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/TypedProperties.java	2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/util/TypedProperties.java	2008-09-13 03:51:32 UTC (rev 4938)
@@ -48,7 +48,6 @@
 import java.util.Map;
 import java.util.Set;
 
-import org.jboss.messaging.core.journal.EncodingSupport;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 
@@ -62,7 +61,7 @@
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
  */
-public class TypedProperties implements EncodingSupport
+public class TypedProperties
 {  
 	private static final Logger log = Logger.getLogger(TypedProperties.class);
 	

Modified: trunk/src/main/org/jboss/messaging/util/VariableLatch.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/VariableLatch.java	2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/util/VariableLatch.java	2008-09-13 03:51:32 UTC (rev 4938)
@@ -28,16 +28,17 @@
 
 /**
  * 
- * This class will use the framework provided to by AbstractQueuedSynchronizer.
- * AbstractQueuedSynchronizer is the framework for any sort of concurrent synchronization, such as Semaphores, events, etc, based on AtomicIntegers.
+ * <p>This class will use the framework provided to by AbstractQueuedSynchronizer.</p>
+ * <p>AbstractQueuedSynchronizer is the framework for any sort of concurrent synchronization, such as Semaphores, events, etc, based on AtomicIntegers.</p>
  * 
- * The idea is, instead of providing each user specific Latch/Synchronization, java.util.concurrent provides the framework for reuses, based on an AtomicInteger (getState())
+ * <p>The idea is, instead of providing each user specific Latch/Synchronization, java.util.concurrent provides the framework for reuses, based on an AtomicInteger (getState())</p>
  * 
- * On JBossMessaging we have the requirement of increment and decrement a counter until the user fires a ready event (commit). At that point we just act as a regular countDown.
+ * <p>On JBossMessaging we have the requirement of increment and decrement a counter until the user fires a ready event (commit). At that point we just act as a regular countDown.</p>
  * 
- * Note: This latch is reusable. Once it reaches zero, you can call up again, and reuse it on further waits.
- *       For example: prepareTransaction will wait for the current completions, and further adds will be called on the latch. Later on when commit is called you can reuse the same latch.
+ * <p>Note: This latch is reusable. Once it reaches zero, you can call up again, and reuse it on further waits.</p>
  * 
+ * <p>For example: prepareTransaction will wait for the current completions, and further adds will be called on the latch. Later on when commit is called you can reuse the same latch.</p>
+ * 
  * @author Clebert Suconic
  * */
 public class VariableLatch

Added: trunk/tests/src/org/jboss/messaging/tests/integration/base/IntegrationTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/base/IntegrationTestBase.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/base/IntegrationTestBase.java	2008-09-13 03:51:32 UTC (rev 4938)
@@ -0,0 +1,155 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.integration.base;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
+import org.jboss.messaging.jms.client.JBossBytesMessage;
+import org.jboss.messaging.jms.client.JBossTextMessage;
+import org.jboss.messaging.tests.util.UnitTestCase;
+
+/**
+ * 
+ * Base class with basic utilities on starting up a basic server
+ * 
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public class IntegrationTestBase extends UnitTestCase
+{
+   
+   // Constants -----------------------------------------------------
+   
+   // Attributes ----------------------------------------------------
+   
+   protected static final String ACCEPTOR_FACTORY = "org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory";
+   protected static final String CONNECTOR_FACTORY = "org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory";
+   
+   protected String journalDir = System.getProperty("java.io.tmpdir", "/tmp") + "/integration-test/journal";
+   protected String bindingsDir = System.getProperty("java.io.tmpdir", "/tmp") + "/integration-test/bindings";
+   protected String pageDir = System.getProperty("java.io.tmpdir", "/tmp") + "/integration-test/page";
+   protected MessagingService messagingService;
+
+   
+   // Static --------------------------------------------------------
+   
+   // Constructors --------------------------------------------------
+   
+   // Public --------------------------------------------------------
+   
+   // Package protected ---------------------------------------------
+   
+   // Protected -----------------------------------------------------
+   
+   protected void clearData()
+   {
+      File file = new File(journalDir);
+      File file2 = new File(bindingsDir);
+      File file3 = new File(pageDir);
+      deleteDirectory(file);
+      file.mkdirs();
+      deleteDirectory(file2);
+      file2.mkdirs();
+      deleteDirectory(file3);
+      file3.mkdirs();
+   }
+
+
+   protected MessagingService createService(Configuration configuration, Map<String, QueueSettings> settings)
+   {
+      TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
+      configuration.getAcceptorConfigurations().add(transportConfig);
+      MessagingService service = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+      
+      for (Map.Entry<String, QueueSettings> setting: settings.entrySet())
+      {
+         service.getServer().getQueueSettingsRepository().addMatch(setting.getKey(), setting.getValue());
+      }
+   
+      
+      return service;
+   }
+
+   protected MessagingService createService()
+   {
+      return createService(createDefaultConfig(), new HashMap<String, QueueSettings>());
+   }
+
+
+   protected Configuration createDefaultConfig()
+   {
+      Configuration configuration = new ConfigurationImpl();
+      configuration.setSecurityEnabled(false);
+      configuration.setJournalMinFiles(2);
+      configuration.setPagingDirectory(pageDir);
+      
+      return configuration;
+   }
+
+
+   protected ClientSessionFactory createFactory()
+   {
+      return new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+   }
+   
+   protected ClientMessage createTextMessage(ClientSession session, String s)
+   {
+      return createTextMessage(session, s, true);
+   }
+
+   protected ClientMessage createTextMessage(ClientSession session, String s, boolean durable)
+   {
+      ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, durable, 0, System.currentTimeMillis(), (byte) 1);
+      message.getBody().putString(s);
+      message.getBody().flip();
+      return message;
+   }
+
+   protected ClientMessage createBytesMessage(ClientSession session, byte[] b, boolean durable)
+   {
+      ClientMessage message = session.createClientMessage(JBossBytesMessage.TYPE, durable, 0, System.currentTimeMillis(), (byte) 1);
+      message.getBody().putBytes(b);
+      message.getBody().flip();
+      return message;
+   }
+
+   
+   
+   // Private -------------------------------------------------------
+   
+   // Inner classes -------------------------------------------------
+   
+}

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java	2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java	2008-09-13 03:51:32 UTC (rev 4938)
@@ -69,7 +69,7 @@
       
       
       PagingManagerImpl managerImpl = 
-         new PagingManagerImpl(new PagingManagerFactoryNIO(journalDir), null, queueSettings);
+         new PagingManagerImpl(new PagingManagerFactoryNIO(journalDir), null, queueSettings, -1);
       managerImpl.start();
       
       PagingStore store = managerImpl.getPageStore(new SimpleString("simple-test"));
@@ -108,12 +108,12 @@
       
       QueueSettings simpleTestSettings = new QueueSettings();
       simpleTestSettings.setDropMessagesWhenFull(true);
-      simpleTestSettings.setMaxSizeBytes(150);
+      simpleTestSettings.setMaxSizeBytes(200);
       
       queueSettings.addMatch("simple-test", simpleTestSettings);
       
       PagingManagerImpl managerImpl = 
-         new PagingManagerImpl(new PagingManagerFactoryNIO(journalDir), null, queueSettings);
+         new PagingManagerImpl(new PagingManagerFactoryNIO(journalDir), null, queueSettings, -1);
       managerImpl.start();
       
       ServerMessage msg = createMessage(1l, new SimpleString("simple-test"), createRandomBuffer(100));

Added: trunk/tests/src/org/jboss/messaging/tests/stress/paging/MultipleDestinationPagingTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/stress/paging/MultipleDestinationPagingTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/stress/paging/MultipleDestinationPagingTest.java	2008-09-13 03:51:32 UTC (rev 4938)
@@ -0,0 +1,211 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.stress.paging;
+
+import java.util.HashMap;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
+import org.jboss.messaging.tests.integration.base.IntegrationTestBase;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * 
+ * TODO: This is an integration-tests that will take some time to run.
+ * Maybe it should be placed on stress-tests?
+ * 
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public class MultipleDestinationPagingTest extends IntegrationTestBase
+{
+   
+   // Constants -----------------------------------------------------
+   
+   // Attributes ----------------------------------------------------
+   
+   MessagingService service;
+   
+   // Static --------------------------------------------------------
+   
+   // Constructors --------------------------------------------------
+   
+   // Public --------------------------------------------------------
+   
+   
+   public void testGlobalPage() throws Exception
+   {
+      testPage(true);
+   }
+   
+   public void testRegularPage() throws Exception
+   {
+      testPage(false);
+   }
+   
+   public void testPage(boolean globalPage) throws Exception
+   {
+      Configuration config = createDefaultConfig();
+      
+      HashMap <String, QueueSettings> settings = new HashMap<String, QueueSettings>();
+      
+      if (globalPage)
+      {
+         config.setMaxGlobalSizeBytes(20*1024*1024);
+      }
+      else
+      {
+         QueueSettings setting = new QueueSettings();
+         setting.setMaxSizeBytes(20*1024*1024);
+         settings.put("page-adr", setting);
+      }
+      
+      service = createService(config, settings);
+      service.start();
+
+      ClientSessionFactory factory = createFactory();
+      ClientSession session = null;
+      
+      try
+      {
+   
+         session = factory.createSession(false, false, false, -1, false);
+         
+         SimpleString address = new SimpleString("page-adr");
+         SimpleString queue[] = new SimpleString[]{new SimpleString("queue1"), new SimpleString("queue2")};
+         
+         session.createQueue(address, queue[0], null, true, false);
+         session.createQueue(address, queue[1], null, true, false);
+         
+         ClientProducer prod = session.createProducer(address);
+         
+         ClientMessage message = createBytesMessage(session, new byte[700], false);
+         
+         int NUMBER_OF_MESSAGES = 60000;
+         
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++)
+         {
+            if (i % 10000 == 0) System.out.println(i);
+            prod.send(message);
+         }
+         
+         session.commit();
+         
+         session.start();
+         
+         int counters[] = new int[2];
+         
+         ClientConsumer consumers[] = new ClientConsumer[] {session.createConsumer(queue[0]), session.createConsumer(queue[1])};
+         
+         int reads = 0;
+         
+         while (true)
+         {
+            int msgs1 = readMessages(session, consumers[0], queue[0]);
+            if (reads ++ == 0)
+            {
+               assertTrue(reads > 0 && reads < NUMBER_OF_MESSAGES);
+            }
+            int msgs2 = readMessages(session, consumers[1], queue[1]);
+            counters[0] += msgs1;
+            counters[1] += msgs2;
+            
+            System.out.println("msgs1 = " + msgs1 + " msgs2 = " + msgs2);
+            
+            if (msgs1 + msgs2 == 0)
+            {
+               break;
+            }
+         }
+
+         consumers[0].close();
+         consumers[1].close();
+         
+         assertEquals(NUMBER_OF_MESSAGES, counters[0]);
+         assertEquals(NUMBER_OF_MESSAGES, counters[1]);
+      }
+      finally
+      {
+         session.close();
+         service.stop();
+      }
+      
+   }
+
+
+
+   private int readMessages(ClientSession session, ClientConsumer consumer, SimpleString queue)
+         throws MessagingException
+   {
+      session.start();
+      int msgs = 0;
+      
+      ClientMessage msg = null;
+      do
+      {
+         msg = consumer.receive(1000);
+         session.acknowledge();
+         if (msg != null)
+         {
+            if (++msgs % 10000 == 0)
+            {
+               System.out.println("received " + msgs);
+               session.commit();
+               
+            }
+         }
+      } while (msg != null);
+      
+      session.commit();
+      
+      return msgs;
+   }
+   
+   
+   
+   // Package protected ---------------------------------------------
+   
+   // Protected -----------------------------------------------------
+   
+   protected void setUp() throws Exception
+   {
+      clearData();
+   }
+   
+   
+   
+   
+   
+   // Private -------------------------------------------------------
+   
+   // Inner classes -------------------------------------------------
+   
+}

Modified: trunk/tests/src/org/jboss/messaging/tests/timing/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/timing/core/server/impl/QueueImplTest.java	2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/tests/src/org/jboss/messaging/tests/timing/core/server/impl/QueueImplTest.java	2008-09-13 03:51:32 UTC (rev 4938)
@@ -37,7 +37,6 @@
 import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.impl.QueueImpl;
-import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.tests.unit.core.server.impl.fakes.FakeConsumer;
 import org.jboss.messaging.tests.util.UnitTestCase;
 import org.jboss.messaging.util.SimpleString;
@@ -83,7 +82,7 @@
 
    public void testScheduledNoConsumer() throws Exception
    {
-      Queue queue = new QueueImpl(1, new SimpleString("queue1"), null, false, true, new QueueSettings(), scheduledExecutor);
+      Queue queue = new QueueImpl(1, new SimpleString("queue1"), null, false, true, scheduledExecutor, null);
 
       //Send one scheduled
 
@@ -149,7 +148,7 @@
 
    private void testScheduled(boolean direct)
    {
-      Queue queue = new QueueImpl(1, new SimpleString("queue1"), null, false, true, new QueueSettings(), scheduledExecutor);
+      Queue queue = new QueueImpl(1, new SimpleString("queue1"), null, false, true, scheduledExecutor, null);
 
       FakeConsumer consumer = null;
 
@@ -246,7 +245,7 @@
 
    public void testDeleteAllReferences() throws Exception
    {
-      Queue queue = new QueueImpl(1, new SimpleString("queue1"), null, false, true, new QueueSettings(), scheduledExecutor);
+      Queue queue = new QueueImpl(1, new SimpleString("queue1"), null, false, true, scheduledExecutor, null);
 
       StorageManager storageManager = EasyMock.createStrictMock(StorageManager.class);
 
@@ -337,7 +336,7 @@
    public void testDeliveryScheduled() throws Exception
    {
       Consumer consumer = EasyMock.createStrictMock(Consumer.class);
-      Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+      Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
       MessageReference messageReference = generateReference(queue, 1);
       final CountDownLatch countDownLatch = new CountDownLatch(1);
       EasyMock.expect(consumer.handle(messageReference)).andAnswer(new IAnswer<HandleStatus>()
@@ -362,7 +361,7 @@
    public void testDeliveryScheduledBusyConsumer() throws Exception
    {
       Consumer consumer = EasyMock.createStrictMock(Consumer.class);
-      Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+      Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
       MessageReference messageReference = generateReference(queue, 1);
       final CountDownLatch countDownLatch = new CountDownLatch(1);
       EasyMock.expect(consumer.handle(messageReference)).andAnswer(new IAnswer<HandleStatus>()

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java	2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java	2008-09-13 03:51:32 UTC (rev 4938)
@@ -68,7 +68,7 @@
       queueSettings.setDefault(new QueueSettings());
 
       PagingStoreFactory spi = EasyMock.createMock(PagingStoreFactory.class);
-      PagingManagerImpl manager = new PagingManagerImpl(spi, null, queueSettings);
+      PagingManagerImpl manager = new PagingManagerImpl(spi, null, queueSettings, -1);
       
       SimpleString destination = new SimpleString("some-destination");
 
@@ -121,7 +121,7 @@
    public void testMultipleThreadsGetStore() throws Exception
    {
       PagingStoreFactory spi = EasyMock.createMock(PagingStoreFactory.class);
-      final PagingManagerImpl manager = new PagingManagerImpl(spi, null, repoSettings);
+      final PagingManagerImpl manager = new PagingManagerImpl(spi, null, repoSettings, -1);
       
       final SimpleString destination = new SimpleString("some-destination");
 

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplTest.java	2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplTest.java	2008-09-13 03:51:32 UTC (rev 4938)
@@ -68,6 +68,8 @@
       
       PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
 
+      qf.setPostOffice(postOffice);
+
       pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
       pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
       EasyMock.replay(pm, qf);
@@ -85,6 +87,8 @@
 
       PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
       
+      qf.setPostOffice(postOffice);
+
       pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
       pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
       EasyMock.replay(pm, qf);
@@ -116,6 +120,8 @@
       
       PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
 
+      qf.setPostOffice(postOffice);
+
       pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
       EasyMock.expectLastCall().andAnswer(new LoadBindingsIAnswer(bindingArrayList, null));
       pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
@@ -161,6 +167,9 @@
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
       PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+
+      qf.setPostOffice(postOffice);
+
       pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
       EasyMock.expectLastCall().andAnswer(new LoadBindingsIAnswer(bindingArrayList, null));
       pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
@@ -204,6 +213,9 @@
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
       
       PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+
+      qf.setPostOffice(postOffice);
+
       pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
       EasyMock.expectLastCall().andAnswer(new LoadBindingsIAnswer(bindingArrayList, null));
 
@@ -253,6 +265,9 @@
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
       PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+
+      qf.setPostOffice(postOffice);
+
       pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
       EasyMock.expectLastCall().andAnswer(new LoadBindingsIAnswer(bindingArrayList, null));
       pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
@@ -296,6 +311,9 @@
       EasyMock.expect(pgm.getPageStore(address1)).andReturn(store);
 
       PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+
+      qf.setPostOffice(postOffice);
+
       pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
       EasyMock.expectLastCall().andAnswer(new LoadBindingsIAnswer(bindingArrayList, dests));
       EasyMock.expect(pm.addDestination(address1)).andReturn(true);
@@ -325,6 +343,9 @@
       PagingStore pgstore = EasyMock.createNiceMock(PagingStore.class);
       
       PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+
+      qf.setPostOffice(postOffice);
+
       ArrayList<Binding> bindingArrayList = new ArrayList<Binding>();
       List<SimpleString> dests = new ArrayList<SimpleString>();
       Binding[] bindings = new Binding[100];
@@ -385,6 +406,9 @@
       
 
       PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+
+      qf.setPostOffice(postOffice);
+
       ArrayList<Binding> bindingArrayList = new ArrayList<Binding>();
       List<SimpleString> dests = new ArrayList<SimpleString>();
       Binding[] bindings = new Binding[100];
@@ -442,6 +466,10 @@
       PagingStore pgstore = EasyMock.createNiceMock(PagingStore.class);
 
       PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+
+      qf.setPostOffice(postOffice);
+
+      
       ArrayList<Binding> bindingArrayList = new ArrayList<Binding>();
       List<SimpleString> dests = new ArrayList<SimpleString>();
       Binding[] bindings = new Binding[100];
@@ -500,6 +528,10 @@
       PagingStore pgstore = EasyMock.createNiceMock(PagingStore.class);
 
       PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+
+      qf.setPostOffice(postOffice);
+
+      
       ArrayList<Binding> bindingArrayList = new ArrayList<Binding>();
       List<SimpleString> dests = new ArrayList<SimpleString>();
       Binding[] bindings = new Binding[100];
@@ -736,6 +768,9 @@
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
       PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+
+      qf.setPostOffice(postOffice);
+
       pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
       pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
       EasyMock.expect(pm.addDestination(address)).andReturn(true);
@@ -759,6 +794,9 @@
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
       PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+
+      qf.setPostOffice(postOffice);
+
       pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
       pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
       EasyMock.expect(pm.addDestination(address)).andReturn(true);
@@ -788,6 +826,9 @@
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
       PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+
+      qf.setPostOffice(postOffice);
+
       pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
       pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
       EasyMock.expect(pm.addDestination(address)).andReturn(true);
@@ -814,6 +855,9 @@
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
       PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+
+      qf.setPostOffice(postOffice);
+
       pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
       pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
       EasyMock.expect(pm.addDestination(address)).andReturn(true);
@@ -848,13 +892,16 @@
    {
       SimpleString queueName = new SimpleString("testQueueName");
       StorageManager pm = EasyMock.createStrictMock(StorageManager.class);
-      QueueFactory qf = EasyMock.createStrictMock(QueueFactory.class);
+      QueueFactory qf = EasyMock.createMock(QueueFactory.class);
       Filter filter = EasyMock.createStrictMock(Filter.class);
       Queue queue = EasyMock.createStrictMock(Queue.class);
       ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
       PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+
+      qf.setPostOffice(postOffice);
+
       pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
       pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
       EasyMock.expect(qf.createQueue(-1, queueName, filter, true)).andReturn(queue);
@@ -885,6 +932,9 @@
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
       PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+
+      qf.setPostOffice(postOffice);
+
       pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
       pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
       EasyMock.expect(qf.createQueue(-1, queueName, filter, true)).andReturn(queue);
@@ -925,6 +975,9 @@
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
       PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+
+      qf.setPostOffice(postOffice);
+
       pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
       pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
       EasyMock.expect(qf.createQueue(-1, queueName, filter, false)).andReturn(queue);
@@ -954,6 +1007,9 @@
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
       PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+
+      qf.setPostOffice(postOffice);
+
       pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
       pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
       EasyMock.expect(qf.createQueue(-1, queueName, filter, false)).andReturn(queue);
@@ -991,6 +1047,9 @@
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
       PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+
+      qf.setPostOffice(postOffice);
+
       pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
       pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
       EasyMock.expect(qf.createQueue(-1, queueName, filter, true)).andReturn(queue);
@@ -1028,6 +1087,9 @@
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
       PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+
+      qf.setPostOffice(postOffice);
+
       pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
       pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
       EasyMock.expect(qf.createQueue(-1, queueName, filter, true)).andReturn(queue);
@@ -1062,6 +1124,9 @@
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
       PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+
+      qf.setPostOffice(postOffice);
+
       pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
       pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
       EasyMock.expect(qf.createQueue(-1, queueName, filter, true)).andReturn(queue);
@@ -1110,6 +1175,9 @@
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
       PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+
+      qf.setPostOffice(postOffice);
+
       pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
       pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
       EasyMock.expect(qf.createQueue(-1, queueName, filter, false)).andReturn(queue);
@@ -1142,6 +1210,9 @@
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
       PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+      
+      qf.setPostOffice(postOffice);
+      
       pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
       pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
       EasyMock.expect(qf.createQueue(-1, queueName, filter, false)).andReturn(queue);
@@ -1185,6 +1256,9 @@
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
       PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+      
+      qf.setPostOffice(postOffice);
+      
       pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
       pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
       EasyMock.expect(queue.getName()).andStubReturn(queueName);
@@ -1214,6 +1288,9 @@
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
       PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+      
+      qf.setPostOffice(postOffice);
+      
       pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
       pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
       EasyMock.expect(message.getDestination()).andStubReturn(new SimpleString("testtDestination"));
@@ -1242,6 +1319,9 @@
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
       PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, false, null);
+      
+      qf.setPostOffice(postOffice);
+      
       pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
       pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
       EasyMock.expect(message.getDestination()).andStubReturn(new SimpleString("testtDestination"));
@@ -1266,6 +1346,9 @@
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
       PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, false, null);
+      
+      qf.setPostOffice(postOffice);
+      
       pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
       pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
       SimpleString address = new SimpleString("testtDestination");
@@ -1310,7 +1393,10 @@
       EasyMock.expect(pgm.addSize(EasyMock.isA(ServerMessage.class))).andReturn(-1l);
       
       PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, false, null);
+
+      qf.setPostOffice(postOffice);
       
+      
       pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
       pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
       
@@ -1347,6 +1433,9 @@
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
       PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, false, null);
+      
+      qf.setPostOffice(postOffice);
+      
       pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
       pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
       SimpleString address = new SimpleString("testtDestination");
@@ -1381,6 +1470,9 @@
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
       PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, false, null);
+      
+      qf.setPostOffice(postOffice);
+      
       pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
       pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
       SimpleString address = new SimpleString("testtDestination");
@@ -1418,6 +1510,9 @@
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
       PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, false, null);
+      
+      qf.setPostOffice(postOffice);
+      
       pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
       pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
       SimpleString address = new SimpleString("testtDestination");
@@ -1474,6 +1569,9 @@
          PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
          PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, false, null);
+         
+         qf.setPostOffice(postOffice);
+         
          pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
          pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
          SimpleString address = new SimpleString("testtDestination");

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java	2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java	2008-09-13 03:51:32 UTC (rev 4938)
@@ -78,7 +78,7 @@
    {
       final long id = 123;
 
-      Queue queue = new QueueImpl(id, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+      Queue queue = new QueueImpl(id, queue1, null, false, true, scheduledExecutor, null);
 
       assertEquals(id, queue.getPersistenceID());
 
@@ -93,29 +93,29 @@
    {
       final SimpleString name = new SimpleString("oobblle");
 
-      Queue queue = new QueueImpl(1, name, null, false, true, new QueueSettings(), scheduledExecutor);
+      Queue queue = new QueueImpl(1, name, null, false, true, scheduledExecutor, null);
 
       assertEquals(name, queue.getName());
    }
 
    public void testClustered()
    {
-      Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+      Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
 
       assertFalse(queue.isClustered());
 
-      queue = new QueueImpl(1, queue1, null, true, true, new QueueSettings(), scheduledExecutor);
+      queue = new QueueImpl(1, queue1, null, true, true, scheduledExecutor, null);
 
       assertTrue(queue.isClustered());
    }
 
    public void testDurable()
    {
-      Queue queue = new QueueImpl(1, queue1, null, false, false, new QueueSettings(), scheduledExecutor);
+      Queue queue = new QueueImpl(1, queue1, null, false, false, scheduledExecutor, null);
 
       assertFalse(queue.isDurable());
 
-      queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+      queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
 
       assertTrue(queue.isDurable());
    }
@@ -128,7 +128,7 @@
 
       Consumer cons3 = new FakeConsumer();
 
-      Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+      Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
 
       assertEquals(0, queue.getConsumerCount());
 
@@ -169,7 +169,7 @@
 
    public void testGetSetDistributionPolicy()
    {
-      Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+      Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
 
       assertNotNull(queue.getDistributionPolicy());
 
@@ -184,14 +184,14 @@
 
    public void testGetFilter()
    {
-      Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+      Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
 
       assertNull(queue.getFilter());
 
       Filter filter = createMock(Filter.class);
       replay(filter);
 
-      queue = new QueueImpl(1, queue1, filter, false, true, new QueueSettings(), scheduledExecutor);
+      queue = new QueueImpl(1, queue1, filter, false, true, scheduledExecutor, null);
 
       assertEquals(filter, queue.getFilter());
       
@@ -200,7 +200,7 @@
 
    public void testSimpleAddLast()
    {
-      Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+      Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
 
       final int numMessages = 10;
 
@@ -219,7 +219,7 @@
 
    public void testSimpleDirectDelivery()
    {
-      Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+      Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
 
       FakeConsumer consumer = new FakeConsumer();
 
@@ -247,7 +247,7 @@
 
    public void testSimpleNonDirectDelivery()
    {
-      Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+      Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
 
       final int numMessages = 10;
 
@@ -285,7 +285,7 @@
 
    public void testBusyConsumer()
    {
-      Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+      Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
 
       FakeConsumer consumer = new FakeConsumer();
 
@@ -329,7 +329,7 @@
 
    public void testBusyConsumerThenAddMoreMessages()
    {
-      Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+      Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
 
       FakeConsumer consumer = new FakeConsumer();
 
@@ -396,7 +396,7 @@
 
    public void testAddFirstAddLast()
    {
-      Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+      Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
 
       final int numMessages = 10;
 
@@ -451,7 +451,7 @@
 
    public void testChangeConsumersAndDeliver() throws Exception
    {
-      Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+      Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
 
       final int numMessages = 10;
 
@@ -605,7 +605,7 @@
 
    public void testConsumerReturningNull()
    {
-      Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+      Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
 
       class NullConsumer implements Consumer
       {
@@ -633,7 +633,7 @@
 
    public void testRoundRobinWithQueueing()
    {
-      Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+      Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
 
       assertTrue(queue.getDistributionPolicy() instanceof RoundRobinDistributionPolicy);
 
@@ -678,7 +678,7 @@
 
    public void testRoundRobinDirect()
    {
-      Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+      Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
 
       assertTrue(queue.getDistributionPolicy() instanceof RoundRobinDistributionPolicy);
 
@@ -721,7 +721,7 @@
 
    public void testDeleteAllReferences() throws Exception
    {
-      Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+      Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
 
       StorageManager storageManager = EasyMock.createStrictMock(StorageManager.class);
 
@@ -805,7 +805,7 @@
 
    public void testWithPriorities()
    {
-      Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+      Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
 
       final int numMessages = 10;
 
@@ -872,7 +872,7 @@
 
    public void testConsumerWithFilterAddAndRemove()
    {
-      Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+      Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
 
       Filter filter = new FakeFilter("fruit", "orange");
 
@@ -881,7 +881,7 @@
 
    public void testList()
    {
-      Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+      Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
 
       final int numMessages = 20;
 
@@ -905,7 +905,7 @@
 
    public void testListWithFilter()
    {
-      Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+      Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
 
       final int numMessages = 20;
 
@@ -941,7 +941,7 @@
 
    public void testConsumeWithFiltersAddAndRemoveConsumer() throws Exception
    {
-      Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+      Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
 
       Filter filter = new FakeFilter("fruit", "orange");
 
@@ -1014,7 +1014,7 @@
 
    private void testConsumerWithFilters(boolean direct) throws Exception
    {
-      Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+      Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
 
       Filter filter = new FakeFilter("fruit", "orange");
 
@@ -1103,7 +1103,7 @@
    public void testMessageOrder() throws Exception
    {
       Consumer consumer = EasyMock.createStrictMock(Consumer.class);
-      Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+      Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
       MessageReference messageReference = generateReference(queue, 1);
       MessageReference messageReference2 = generateReference(queue, 2);
       MessageReference messageReference3 = generateReference(queue, 3);
@@ -1121,7 +1121,7 @@
 
    public void testMessagesAdded() throws Exception
    {
-      Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+      Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
       MessageReference messageReference = generateReference(queue, 1);
       MessageReference messageReference2 = generateReference(queue, 2);
       MessageReference messageReference3 = generateReference(queue, 3);
@@ -1134,7 +1134,7 @@
    public void testAddLastWhenLocked() throws Exception
    {
 
-      Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+      Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
       MessageReference messageReference = generateReference(queue, 1);
       queue.lock();
       CountDownLatch countDownLatch = new CountDownLatch(1);
@@ -1150,7 +1150,7 @@
    public void testAddLastWhenLockedMultiple() throws Exception
    {
 
-      Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+      Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
       MessageReference messageReference = generateReference(queue, 1);
       MessageReference messageReference2 = generateReference(queue, 2);
       MessageReference messageReference3 = generateReference(queue, 3);
@@ -1176,7 +1176,7 @@
    public void testAddFirstWhenLocked() throws Exception
    {
 
-      Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+      Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
       MessageReference messageReference = generateReference(queue, 1);
       queue.lock();
       CountDownLatch countDownLatch = new CountDownLatch(1);
@@ -1192,7 +1192,7 @@
    public void testAddFirstWhenLockedMultiple() throws Exception
    {
 
-      Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+      Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
       MessageReference messageReference = generateReference(queue, 1);
       MessageReference messageReference2 = generateReference(queue, 2);
       MessageReference messageReference3 = generateReference(queue, 3);
@@ -1218,7 +1218,7 @@
    public void testAddListFirst() throws Exception
    {
       Consumer consumer = EasyMock.createStrictMock(Consumer.class);
-      Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+      Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
       MessageReference messageReference = generateReference(queue, 1);
       MessageReference messageReference2 = generateReference(queue, 2);
       MessageReference messageReference3 = generateReference(queue, 3);
@@ -1239,7 +1239,7 @@
    public void testRemoveReferenceWithId() throws Exception
    {
       Consumer consumer = EasyMock.createStrictMock(Consumer.class);
-      Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+      Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
       MessageReference messageReference = generateReference(queue, 1);
       MessageReference messageReference2 = generateReference(queue, 2);
       MessageReference messageReference3 = generateReference(queue, 3);
@@ -1260,7 +1260,7 @@
 
    public void testGetReference() throws Exception
    {
-      Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+      Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
       MessageReference messageReference = generateReference(queue, 1);
       MessageReference messageReference2 = generateReference(queue, 2);
       MessageReference messageReference3 = generateReference(queue, 3);
@@ -1273,7 +1273,7 @@
 
    public void testGetNonExistentReference() throws Exception
    {
-      Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+      Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
       MessageReference messageReference = generateReference(queue, 1);
       MessageReference messageReference2 = generateReference(queue, 2);
       MessageReference messageReference3 = generateReference(queue, 3);
@@ -1287,7 +1287,7 @@
    public void testConsumerRemovedAfterException() throws Exception
    {
       Consumer consumer = EasyMock.createStrictMock(Consumer.class);
-      Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+      Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
       MessageReference messageReference = generateReference(queue, 1);
       MessageReference messageReference2 = generateReference(queue, 2);
       MessageReference messageReference3 = generateReference(queue, 3);
@@ -1307,7 +1307,7 @@
    public void testDeliveryAsync() throws Exception
    {
       Consumer consumer = EasyMock.createStrictMock(Consumer.class);
-      Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+      Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
       MessageReference messageReference = generateReference(queue, 1);
       MessageReference messageReference2 = generateReference(queue, 2);
       MessageReference messageReference3 = generateReference(queue, 3);
@@ -1336,7 +1336,7 @@
    {
       long messageID = randomLong();
       final SimpleString expiryQueue = new SimpleString("expiryQueue");
-      Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+      Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
       MessageReference messageReference = generateReference(queue, messageID);
       StorageManager storageManager = EasyMock.createMock(StorageManager.class);
       EasyMock.expect(storageManager.generateTransactionID()).andReturn(randomLong());
@@ -1394,7 +1394,7 @@
    {
       long messageID = randomLong();
       final SimpleString dlqName = new SimpleString("dlq");
-      Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+      Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
       MessageReference messageReference = generateReference(queue, messageID);
       StorageManager storageManager = createMock(StorageManager.class);
       expect(storageManager.generateTransactionID()).andReturn(randomLong());
@@ -1453,7 +1453,7 @@
       long newMessageID = randomLong();
       long tid = randomLong();
       final SimpleString toQueueName = new SimpleString("toQueueName");
-      Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+      Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
       Queue toQueue = createMock(Queue.class);
     
       MessageReference messageReference = generateReference(queue, messageID);

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/fakes/FakeQueueFactory.java	2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/fakes/FakeQueueFactory.java	2008-09-13 03:51:32 UTC (rev 4938)
@@ -26,10 +26,10 @@
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.jboss.messaging.core.filter.Filter;
+import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.QueueFactory;
 import org.jboss.messaging.core.server.impl.QueueImpl;
-import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.util.SimpleString;
 
 /**
@@ -42,11 +42,19 @@
 public class FakeQueueFactory implements QueueFactory
 {
 	private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
+	
+	private PostOffice postOffice;
 
 	public Queue createQueue(long persistenceID, SimpleString name, Filter filter,
 			                   boolean durable)
 	{
-		return new QueueImpl(persistenceID, name, filter, false, durable, new QueueSettings(), scheduledExecutor);
+		return new QueueImpl(persistenceID, name, filter, false, durable, scheduledExecutor, postOffice);
 	}
 
+   public void setPostOffice(PostOffice postOffice)
+   {
+      this.postOffice = postOffice;
+      
+   }
+
 }

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/transaction/impl/TransactionImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/transaction/impl/TransactionImplTest.java	2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/transaction/impl/TransactionImplTest.java	2008-09-13 03:51:32 UTC (rev 4938)
@@ -535,13 +535,13 @@
    public void testAckCommit() throws Exception
    {
       //Durable queue
-      Queue queue1 = new QueueImpl(12, new SimpleString("queue1"), null, false, true, new QueueSettings(), scheduledExecutor);
+      Queue queue1 = new QueueImpl(12, new SimpleString("queue1"), null, false, true, scheduledExecutor, null);
       
       //Durable queue
-      Queue queue2 = new QueueImpl(34, new SimpleString("queue2"), null, false, true, new QueueSettings(), scheduledExecutor);
+      Queue queue2 = new QueueImpl(34, new SimpleString("queue2"), null, false, true, scheduledExecutor, null);
       
       //Non durable queue
-      Queue queue3 = new QueueImpl(65, new SimpleString("queue3"), null, false, false, new QueueSettings(), scheduledExecutor);
+      Queue queue3 = new QueueImpl(65, new SimpleString("queue3"), null, false, false, scheduledExecutor, null);
       
       //Some refs to ack
       

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossSessionTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossSessionTest.java	2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossSessionTest.java	2008-09-13 03:51:32 UTC (rev 4938)
@@ -672,7 +672,7 @@
 
       // isExists() will return true
       SessionQueueQueryResponseMessage resp = new SessionQueueQueryResponseMessage(
-            false, -1, 0, 1, null, destination.getSimpleAddress());
+            false, 0, 1, null, destination.getSimpleAddress());
       expect(mockClientSession.queueQuery(destination.getSimpleAddress()))
             .andReturn(resp);
       expect(
@@ -1032,7 +1032,7 @@
             .andReturn(bindingResp);
       // already 1 durable subscriber
       SessionQueueQueryResponseMessage queryResp =
-         new SessionQueueQueryResponseMessage(true, -1, 1, 0, null, topic.getSimpleAddress());
+         new SessionQueueQueryResponseMessage(true, 1, 0, null, topic.getSimpleAddress());
       expect(mockClientSession.queueQuery(isA(SimpleString.class))).andReturn(queryResp);
 
       replay(sf, mockClientSession);
@@ -1109,7 +1109,7 @@
             .andReturn(bindingResp);
       // isExists will return true
       SessionQueueQueryResponseMessage queryResp =
-         new SessionQueueQueryResponseMessage(true, -1, 0, 0, null, topic.getSimpleAddress());
+         new SessionQueueQueryResponseMessage(true, 0, 0, null, topic.getSimpleAddress());
       expect(mockClientSession.queueQuery(isA(SimpleString.class))).andReturn(queryResp);
       expect(
             mockClientSession.createConsumer(isA(SimpleString.class),
@@ -1148,7 +1148,7 @@
             .andReturn(bindingResp);
       // isExists will return true
       SessionQueueQueryResponseMessage queryResp =
-         new SessionQueueQueryResponseMessage(true, -1, 0, 0, null, oldTopic.getSimpleAddress());
+         new SessionQueueQueryResponseMessage(true, 0, 0, null, oldTopic.getSimpleAddress());
       expect(mockClientSession.queueQuery(isA(SimpleString.class))).andReturn(queryResp);
       // queue address of the old topic
       mockClientSession.deleteQueue(isA(SimpleString.class));
@@ -1191,7 +1191,7 @@
             .andReturn(bindingResp);
       // isExists will return true
       SessionQueueQueryResponseMessage queryResp =
-         new SessionQueueQueryResponseMessage(true, -1, 0, 0, oldSelector, topic.getSimpleAddress());
+         new SessionQueueQueryResponseMessage(true, 0, 0, oldSelector, topic.getSimpleAddress());
       expect(mockClientSession.queueQuery(isA(SimpleString.class))).andReturn(queryResp);
       // queue address of the old topic
       mockClientSession.deleteQueue(isA(SimpleString.class));
@@ -1377,7 +1377,7 @@
 
       // isExists() will return true
       SessionQueueQueryResponseMessage resp = new SessionQueueQueryResponseMessage(
-            false, -1, -1, 1, null, queueAddress);
+            false, -1, 1, null, queueAddress);
       expect(mockClientSession.queueQuery(queueAddress)).andReturn(resp);
 
       replay(sf, mockClientSession);
@@ -1674,7 +1674,7 @@
       
       // isExists() will return true
       SessionQueueQueryResponseMessage resp = new SessionQueueQueryResponseMessage(
-            false, -1, -1, 1, null, queueAddress);
+            false, -1, 1, null, queueAddress);
       expect(mockClientSession.queueQuery(queueAddress)).andReturn(resp);      
       mockClientSession.removeDestination(queueAddress, false);
       mockClientSession.deleteQueue(queueAddress);
@@ -1724,7 +1724,7 @@
       
       
       SessionQueueQueryResponseMessage resp =
-         new SessionQueueQueryResponseMessage(false, 0, consumerCount, 0, null, queueAddress);
+         new SessionQueueQueryResponseMessage(false, consumerCount, 0, null, queueAddress);
       expect(mockClientSession.queueQuery(queueAddress)).andReturn(resp);
       replay(sf, mockClientSession);
 
@@ -1946,7 +1946,7 @@
       String subName = randomString();
       String clientID = randomString();
       SimpleString queueAddres = new SimpleString(JBossTopic.createQueueNameForDurableSubscription(clientID, subName));      
-      SessionQueueQueryResponseMessage resp = new SessionQueueQueryResponseMessage(false, 0, 0, 0, null, queueAddres);
+      SessionQueueQueryResponseMessage resp = new SessionQueueQueryResponseMessage(false, 0, 0, null, queueAddres);
       expect(mockClientSession.queueQuery(queueAddres)).andReturn(resp );
       mockClientSession.deleteQueue(queueAddres);
       replay(sf, mockClientSession);
@@ -1991,7 +1991,7 @@
       int consumerCount = 1;      
 
       SessionQueueQueryResponseMessage resp =
-         new SessionQueueQueryResponseMessage(true, 0, consumerCount, 0, null, queueAddres);
+         new SessionQueueQueryResponseMessage(true, consumerCount, 0, null, queueAddres);
       expect(mockClientSession.queueQuery(isA(SimpleString.class))).andReturn(resp );
       replay(sf, mockClientSession);
 
@@ -2083,7 +2083,7 @@
 
       // isExists() will return true
       SessionQueueQueryResponseMessage resp = new SessionQueueQueryResponseMessage(
-            false, -1, 0, 1, null, destination.getSimpleAddress());
+            false, 0, 1, null, destination.getSimpleAddress());
       expect(mockClientSession.queueQuery(destination.getSimpleAddress()))
             .andReturn(resp);
       expect(

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/jms/server/management/SubscriptionInfoTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/jms/server/management/SubscriptionInfoTest.java	2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/jms/server/management/SubscriptionInfoTest.java	2008-09-13 03:51:32 UTC (rev 4938)
@@ -58,7 +58,6 @@
       assertEquals(expected.isDurable(), actual.get("durable"));
       assertEquals(expected.getSelector(), actual.get("selector"));
       assertEquals(expected.getMessageCount(), actual.get("messageCount"));
-      assertEquals(expected.getMaxSizeBytes(), actual.get("maxSizeBytes"));
    }
 
    // Constructors --------------------------------------------------
@@ -68,8 +67,7 @@
    public void testToCompositeData() throws Exception
    {
       SubscriptionInfo info = new SubscriptionInfo(randomString(), randomString(),
-            randomString(), randomBoolean(), randomString(), randomInt(),
-            randomInt());
+            randomString(), randomBoolean(), randomString(), randomInt());
       CompositeData data = info.toCompositeData();
 
       assertEquals(info, data);
@@ -78,11 +76,9 @@
    public void testToTabularData() throws Exception
    {
       SubscriptionInfo info_1 = new SubscriptionInfo(randomString(), randomString(),
-            randomString(), randomBoolean(), randomString(), randomInt(),
-            randomInt());
+            randomString(), randomBoolean(), randomString(), randomInt());
       SubscriptionInfo info_2 = new SubscriptionInfo(randomString(), randomString(),
-            randomString(), randomBoolean(), randomString(), randomInt(),
-            randomInt());
+            randomString(), randomBoolean(), randomString(), randomInt());
       SubscriptionInfo[] infos = new SubscriptionInfo[] { info_1, info_2 };
 
       TabularData data = SubscriptionInfo.toTabularData(infos);

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/jms/server/management/impl/TopicControlTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/jms/server/management/impl/TopicControlTest.java	2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/jms/server/management/impl/TopicControlTest.java	2008-09-13 03:51:32 UTC (rev 4938)
@@ -284,11 +284,8 @@
       JBossTopic topic = new JBossTopic(name);
       PostOffice postOffice = createMock(PostOffice.class);
       StorageManager storageManager = createMock(StorageManager.class);
-      
-      QueueSettings settings = new QueueSettings();
 
       Queue durableQueue = createMock(Queue.class);
-      expect(durableQueue.getSettings()).andStubReturn(settings);
       expect(durableQueue.getName()).andStubReturn(
             JBossTopic.createAddressFromName(randomString()));
       expect(durableQueue.getFilter()).andStubReturn(null);
@@ -298,7 +295,6 @@
       expect(bindingForDurableQueue.getQueue()).andStubReturn(durableQueue);
 
       Queue nonDurableQueue = createMock(Queue.class);
-      expect(nonDurableQueue.getSettings()).andStubReturn(settings);
       expect(nonDurableQueue.getName()).andStubReturn(
             JBossTopic.createAddressFromName(randomString()));
       expect(nonDurableQueue.getFilter()).andStubReturn(null);
@@ -342,7 +338,6 @@
       StorageManager storageManager = createMock(StorageManager.class);
 
       Queue durableQueue = createMock(Queue.class);
-      expect(durableQueue.getSettings()).andStubReturn(settings);
       expect(durableQueue.getName()).andStubReturn(
             JBossTopic.createAddressFromName(randomString()));
       expect(durableQueue.getFilter()).andStubReturn(null);
@@ -352,7 +347,6 @@
       expect(bindingForDurableQueue.getQueue()).andStubReturn(durableQueue);
 
       Queue nonDurableQueue = createMock(Queue.class);
-      expect(nonDurableQueue.getSettings()).andStubReturn(settings);
       expect(nonDurableQueue.getName()).andStubReturn(
             JBossTopic.createAddressFromName(randomString()));
       expect(nonDurableQueue.getFilter()).andStubReturn(null);




More information about the jboss-cvs-commits mailing list