[jboss-cvs] JBoss Messaging SVN: r5520 - in trunk: src/main/org/jboss/messaging/core/client/impl and 16 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Dec 11 22:59:02 EST 2008


Author: clebert.suconic at jboss.com
Date: 2008-12-11 22:59:01 -0500 (Thu, 11 Dec 2008)
New Revision: 5520

Modified:
   trunk/src/config/jbm-configuration.xml
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
   trunk/src/main/org/jboss/messaging/core/config/Configuration.java
   trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
   trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
   trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
   trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java
   trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java
   trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
   trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/jboss/messaging/core/server/MessageReference.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java
   trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingServiceIntegrationTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/queue/ExpiryAddressTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaRecoveryTest.java
   trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/MessageReferenceImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/transaction/impl/TransactionImplTest.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1467 & Several tweaks on Paging, ExpiryDelivery and added a bunch of more tests

Modified: trunk/src/config/jbm-configuration.xml
===================================================================
--- trunk/src/config/jbm-configuration.xml	2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/src/config/jbm-configuration.xml	2008-12-12 03:59:01 UTC (rev 5520)
@@ -156,6 +156,8 @@
       
       <!--  Paging configuration -->
       
+      <paging-max-threads>10</paging-max-threads>
+      
       <paging-directory>data/paging</paging-directory>
       
       <paging-max-global-size-bytes>104857600</paging-max-global-size-bytes>

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-12-12 03:59:01 UTC (rev 5520)
@@ -674,6 +674,8 @@
                                                   "-" +
                                                   getID() +
                                                   ".jbm"));
+         
+         cloneMessage.setFlowControlSize(message.getFlowControlSize());
 
          addBytesBody(cloneMessage, message.getBody().array());
 

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java	2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java	2008-12-12 03:59:01 UTC (rev 5520)
@@ -111,14 +111,11 @@
 
    public int getFlowControlSize()
    {
-      if (flowControlSize > 0)
+      if (flowControlSize < 0)
       {
-         return flowControlSize;
+         throw new IllegalStateException("Flow Control hasn't been set");
       }
-      else
-      {
-         return getEncodeSize();
-      }
+      return flowControlSize;
    }
 
    public void setFlowControlSize(final int flowControlSize)

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java	2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java	2008-12-12 03:59:01 UTC (rev 5520)
@@ -81,6 +81,7 @@
                }
                else
                {
+                  message.getClientMessage().setFlowControlSize(packet.getPacketSize());
                   clientSession.handleReceiveMessage(message.getConsumerID(), message.getClientMessage());
                }
                

Modified: trunk/src/main/org/jboss/messaging/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/Configuration.java	2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/src/main/org/jboss/messaging/core/config/Configuration.java	2008-12-12 03:59:01 UTC (rev 5520)
@@ -56,6 +56,7 @@
    
    void setQueueActivationTimeout(long timeout);
 
+   
    int getScheduledThreadPoolMaxSize();
 
    void setScheduledThreadPoolMaxSize(int maxSize);
@@ -178,6 +179,10 @@
    
    // Paging Properties --------------------------------------------------------------------
    
+   int getPagingMaxThreads();
+   
+   void setPagingMaxThread(int pagingMaxThreads);
+   
    String getPagingDirectory();
 
    void setPagingDirectory(String dir);

Modified: trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2008-12-12 03:59:01 UTC (rev 5520)
@@ -65,6 +65,8 @@
 
    public static final String DEFAULT_PAGING_DIR = "data/paging";
    
+   public static final int DEFAULT_PAGE_MAX_THREADS = 10;
+   
    public static final long DEFAULT_PAGE_SIZE = 10 * 1024 * 1024;
    
    public static final String DEFAULT_LARGE_MESSAGES_DIR = "data/largemessages";
@@ -169,6 +171,8 @@
 
    protected String pagingDirectory = DEFAULT_PAGING_DIR;
    
+   protected int pagingMaxThreads = DEFAULT_PAGE_MAX_THREADS;
+   
 
    // File related attributes -----------------------------------------------------------
 
@@ -402,7 +406,18 @@
    {
       return journalType;
    }
+   
+   public int getPagingMaxThreads()
+   {
+      return pagingMaxThreads;
+   }
+   
+   public void setPagingMaxThread(final int pagingMaxThreads)
+   {
+      this.pagingMaxThreads = pagingMaxThreads;
+   }
 
+   
    public void setPagingDirectory(final String dir)
    {
       pagingDirectory = dir;

Modified: trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java	2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java	2008-12-12 03:59:01 UTC (rev 5520)
@@ -226,6 +226,8 @@
 
       journalDirectory = getString(e, "journal-directory", journalDirectory);
 
+      pagingMaxThreads = getInteger(e, "paging-max-threads", pagingMaxThreads);
+      
       pagingDirectory = getString(e, "paging-directory", pagingDirectory);
 
       pagingMaxGlobalSize = getLong(e, "paging-max-global-size-bytes", pagingMaxGlobalSize);

Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java	2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java	2008-12-12 03:59:01 UTC (rev 5520)
@@ -26,6 +26,7 @@
 
 import org.jboss.messaging.core.journal.SequentialFile;
 import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.MessagingComponent;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.util.SimpleString;
@@ -103,7 +104,7 @@
     * Point to inform/restoring Transactions used when the messages were added into paging
     * */
    void addTransaction(PageTransactionInfo pageTransaction);
-   
+
    /**
     * Point to inform/restoring Transactions used when the messages were added into paging
     * */
@@ -123,9 +124,13 @@
    void messageDone(ServerMessage message) throws Exception;
 
    /** To be called when an message is being added to the address.
-    *  @return the current size of the queue, or -1 if the queue is full and it should drop the message */
-   long addSize(ServerMessage message) throws Exception;
+    *  @return false is the address is full */
+   boolean addSize(ServerMessage message) throws Exception;
 
+   void removeSize(ServerMessage message) throws Exception;
+
+   void removeSize(MessageReference reference) throws Exception;
+
    /** Sync current-pages on disk for these destinations */
    void sync(Collection<SimpleString> destinationsToSync) throws Exception;
 

Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java	2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java	2008-12-12 03:59:01 UTC (rev 5520)
@@ -82,5 +82,5 @@
     * @return
     * @throws Exception 
     */
-   long addSize(long memoryEstimate) throws Exception;
+   boolean addSize(long memoryEstimate) throws Exception;
 }

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2008-12-12 03:59:01 UTC (rev 5520)
@@ -36,6 +36,7 @@
 import org.jboss.messaging.core.paging.PagingStoreFactory;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
@@ -194,11 +195,21 @@
       getPageStore(message.getDestination()).addSize(message.getMemoryEstimate() * -1);
    }
 
-   public long addSize(final ServerMessage message) throws Exception
+   public boolean addSize(final ServerMessage message) throws Exception
    {
       return getPageStore(message.getDestination()).addSize(message.getMemoryEstimate());
    }
 
+   public void removeSize(final ServerMessage message) throws Exception
+   {
+      getPageStore(message.getDestination()).addSize(-message.getMemoryEstimate());
+   }
+
+   public void removeSize(final MessageReference reference) throws Exception
+   {
+      getPageStore(reference.getMessage().getDestination()).addSize(-reference.getMemoryEstimate());
+   }
+
    public boolean page(final ServerMessage message, final long transactionId) throws Exception
    {
       // The sync on transactions is done on commit only
@@ -283,7 +294,7 @@
          store.startDepaging(pagingSPI.getGlobalDepagerExecutor());
       }
    }
-   
+
    /* (non-Javadoc)
     * @see org.jboss.messaging.core.paging.PagingManager#getGlobalSize()
     */

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java	2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java	2008-12-12 03:59:01 UTC (rev 5520)
@@ -25,6 +25,8 @@
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import org.jboss.messaging.core.journal.SequentialFileFactory;
@@ -70,11 +72,15 @@
 
    // Constructors --------------------------------------------------
 
-   public PagingStoreFactoryNIO(final String directory)
+   public PagingStoreFactoryNIO(final String directory, final int maxThreads)
    {
+      System.out.println("maxThreads = " + maxThreads);
       this.directory = directory;
 
-      parentExecutor = Executors.newCachedThreadPool(new JBMThreadFactory("JBM-depaging-threads"));
+      parentExecutor = new ThreadPoolExecutor(0, maxThreads,
+                             60L, TimeUnit.SECONDS,
+                             new SynchronousQueue<Runnable>(),
+                             new JBMThreadFactory("JBM-depaging-threads"));
       
       executorFactory = new OrderedExecutorFactory(parentExecutor);
       

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2008-12-12 03:59:01 UTC (rev 5520)
@@ -37,6 +37,7 @@
 import org.jboss.messaging.core.journal.SequentialFile;
 import org.jboss.messaging.core.journal.SequentialFileFactory;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.message.impl.MessageImpl;
 import org.jboss.messaging.core.paging.LastPageRecord;
 import org.jboss.messaging.core.paging.Page;
 import org.jboss.messaging.core.paging.PageTransactionInfo;
@@ -215,7 +216,7 @@
       return storeName;
    }
 
-   public long addSize(final long size) throws Exception
+   public boolean addSize(final long size) throws Exception
    {
       final long maxSize = getMaxSizeBytes();
 
@@ -235,11 +236,12 @@
                log.warn("Messages are being dropped on adress " + getStoreName());
             }
 
-            return -1l;
+            return false;
          }
          else
          {
-            return addAddressSize(size);
+            addAddressSize(size);
+            return true;
          }
       }
       else
@@ -284,14 +286,14 @@
             {
 
                log.trace(" globalDepage = " + pagingManager.isGlobalPageMode() +
-                     "\n currentGlobalSize = " +
-                     currentGlobalSize +
-                     "\n defaultPageSize = " +
-                     pagingManager.getDefaultPageSize() +
-                     "\n maxGlobalSize = " +
-                     maxGlobalSize +
-                     "\n maxGlobalSize - defaultPageSize = " +
-                     (maxGlobalSize - pagingManager.getDefaultPageSize()));
+                         "\n currentGlobalSize = " +
+                         currentGlobalSize +
+                         "\n defaultPageSize = " +
+                         pagingManager.getDefaultPageSize() +
+                         "\n maxGlobalSize = " +
+                         maxGlobalSize +
+                         "\n maxGlobalSize - defaultPageSize = " +
+                         (maxGlobalSize - pagingManager.getDefaultPageSize()));
             }
 
             if (pagingManager.isGlobalPageMode() && currentGlobalSize < maxGlobalSize - pagingManager.getDefaultPageSize())
@@ -307,7 +309,7 @@
             }
          }
 
-         return addressSize;
+         return true;
       }
    }
 
@@ -434,7 +436,8 @@
          else
          {
             // startDepaging and clearDepage needs to be atomic.
-            // We can't use writeLock to this operation as writeLock would still be used by another thread, and still being a valid usage
+            // We can't use writeLock to this operation as writeLock would still be used by another thread, and still
+            // being a valid usage
             synchronized (this)
             {
                if (!depaging.get())
@@ -779,8 +782,17 @@
             }
          }
 
-         refsToAdd.addAll(postOffice.route(pagedMessage));
+         List<MessageReference> routedReferences = postOffice.route(pagedMessage);
 
+         Long scheduledDeliveryTime = (Long)pagedMessage.getProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME);
+
+         if (scheduledDeliveryTime != null)
+         {
+            postOffice.scheduleReferences(depageTransactionID, scheduledDeliveryTime, routedReferences);
+         }
+
+         refsToAdd.addAll(routedReferences);
+
          if (pagedMessage.getDurableRefCount() != 0)
          {
             storageManager.storeMessageTransactional(depageTransactionID, pagedMessage);
@@ -806,11 +818,7 @@
 
       trace("Depage committed");
 
-      for (MessageReference ref : refsToAdd)
-      {
-         ref.getQueue().addLast(ref);
-      }
-
+      postOffice.deliver(refsToAdd);
    }
 
    /**
@@ -988,7 +996,8 @@
                {
                   readPage();
                }
-               // Note: clearDepage is an atomic operation, it needs to be done even if readPage was not executed because the page was full
+               // Note: clearDepage is an atomic operation, it needs to be done even if readPage was not executed
+               // because the page was full
                if (!clearDepage())
                {
                   followingExecutor.execute(this);

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java	2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java	2008-12-12 03:59:01 UTC (rev 5520)
@@ -51,6 +51,7 @@
  * route to.
  * 
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+* @author <a href="mailto:csuconic at redhat.com">Clebert Suconic</a>
  *
  */
 public interface PostOffice extends MessagingComponent
@@ -70,8 +71,11 @@
    
    Binding getBinding(SimpleString queueName);
       
+   /** Deliver references previously routed */
+   void deliver(List<MessageReference> references);
+
    List<MessageReference> route(ServerMessage message) throws Exception;
-      
+
    //For testing only
    Map<SimpleString, List<Binding>> getMappings();
 
@@ -84,4 +88,8 @@
    SendLock getAddressLock(SimpleString address);
    
    DuplicateIDCache getDuplicateIDCache(SimpleString address);
+   
+   void scheduleReferences(long scheduledDeliveryTime,  List<MessageReference> references) throws Exception;
+   
+   void scheduleReferences( long transactionID,  long scheduledDeliveryTime,  List<MessageReference> references) throws Exception;
 }

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2008-12-12 03:59:01 UTC (rev 5520)
@@ -37,7 +37,9 @@
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.management.ManagementService;
+import org.jboss.messaging.core.message.impl.MessageImpl;
 import org.jboss.messaging.core.paging.PagingManager;
+import org.jboss.messaging.core.paging.PagingStore;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.AddressManager;
 import org.jboss.messaging.core.postoffice.Binding;
@@ -48,6 +50,7 @@
 import org.jboss.messaging.core.server.QueueFactory;
 import org.jboss.messaging.core.server.SendLock;
 import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.impl.MessageReferenceImpl;
 import org.jboss.messaging.core.server.impl.SendLockImpl;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
@@ -61,6 +64,7 @@
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
+ * @author <a href="csuconic at redhat.com">Clebert Suconic</a>
  */
 public class PostOfficeImpl implements PostOffice
 {
@@ -97,7 +101,7 @@
    private final ConcurrentMap<SimpleString, DuplicateIDCache> duplicateIDCaches = new ConcurrentHashMap<SimpleString, DuplicateIDCache>();
 
    private final int idCacheSize;
-   
+
    private final boolean persistIDCache;
 
    public PostOfficeImpl(final StorageManager storageManager,
@@ -144,7 +148,7 @@
       this.backup = backup;
 
       this.idCacheSize = idCacheSize;
-      
+
       this.persistIDCache = persistIDCache;
    }
 
@@ -314,82 +318,96 @@
       return addressManager.getBinding(queueName);
    }
 
+   public void deliver(final List<MessageReference> references)
+   {
+      for (MessageReference ref : references)
+      {
+         ref.getQueue().addLast(ref);
+      }
+   }
+   
    public List<MessageReference> route(final ServerMessage message) throws Exception
    {
-      long size = pagingManager.addSize(message);
+      final PagingStore pagingStore = pagingManager.getPageStore(message.getDestination());
 
-      if (size < 0)
+      SimpleString address = message.getDestination();
+
+      if (checkAllowable)
       {
-         return new ArrayList<MessageReference>();
-      }
-      else
-      {
-         SimpleString address = message.getDestination();
-
-         if (checkAllowable)
+         if (!addressManager.containsDestination(address))
          {
-            if (!addressManager.containsDestination(address))
-            {
-               throw new MessagingException(MessagingException.ADDRESS_DOES_NOT_EXIST,
-                                            "Cannot route to address " + address);
-            }
+            throw new MessagingException(MessagingException.ADDRESS_DOES_NOT_EXIST,
+                                         "Cannot route to address " + address);
          }
+      }
 
-         List<Binding> bindings = addressManager.getBindings(address);
+      List<Binding> bindings = addressManager.getBindings(address);
 
-         List<MessageReference> refs = new ArrayList<MessageReference>();
+      List<MessageReference> refs = new ArrayList<MessageReference>();
 
-         if (bindings != null)
+      int refEstimate = 0;
+      if (bindings != null)
+      {
+         Binding theBinding = null;
+
+         long lowestRoutings = -1;
+
+         for (Binding binding : bindings)
          {
-            Binding theBinding = null;
+            Queue queue = binding.getQueue();
 
-            long lowestRoutings = -1;
+            Filter filter = queue.getFilter();
 
-            for (Binding binding : bindings)
+            if (filter == null || filter.match(message))
             {
-               Queue queue = binding.getQueue();
+               if (binding.isFanout())
+               {
+                  // Fanout bindings always get the reference
+                  MessageReference reference = message.createReference(queue);
 
-               Filter filter = queue.getFilter();
+                  refEstimate += reference.getMemoryEstimate();
 
-               if (filter == null || filter.match(message))
+                  refs.add(reference);
+               }
+               else
                {
-                  if (binding.isFanout())
-                  {
-                     // Fanout bindings always get the reference
-                     MessageReference reference = message.createReference(queue);
+                  // We choose the queue with the lowest routings value
+                  // This gives us a weighted round robin, where the weight
+                  // Can be determined from the number of consumers on the queue
+                  long routings = binding.getRoutings();
 
-                     refs.add(reference);
-                  }
-                  else
+                  if (routings < lowestRoutings || lowestRoutings == -1)
                   {
-                     // We choose the queue with the lowest routings value
-                     // This gives us a weighted round robin, where the weight
-                     // Can be determined from the number of consumers on the queue
-                     long routings = binding.getRoutings();
+                     lowestRoutings = routings;
 
-                     if (routings < lowestRoutings || lowestRoutings == -1)
-                     {
-                        lowestRoutings = routings;
-
-                        theBinding = binding;
-                     }
+                     theBinding = binding;
                   }
                }
             }
+         }
 
-            if (theBinding != null)
-            {
-               MessageReference reference = message.createReference(theBinding.getQueue());
+         if (theBinding != null)
+         {
+            MessageReference reference = message.createReference(theBinding.getQueue());
 
-               refs.add(reference);
+            refEstimate += reference.getMemoryEstimate();
 
-               theBinding.incrementRoutings();
-            }
+            refs.add(reference);
 
+            theBinding.incrementRoutings();
          }
 
+      }
+
+      if (refs.size() > 0 && pagingStore.addSize(message.getMemoryEstimate() + refEstimate))
+      {
          return refs;
       }
+      else
+      {
+         return new ArrayList<MessageReference>();
+      }
+
    }
 
    public PagingManager getPagingManager()
@@ -457,7 +475,34 @@
 
       return cache;
    }
+   
+   
+   public void scheduleReferences(final long scheduledDeliveryTime, final List<MessageReference> references) throws Exception
+   {
+      scheduleReferences(-1, scheduledDeliveryTime, references);
+   }
+   
+   public void scheduleReferences(final long transactionID, final long scheduledDeliveryTime, final List<MessageReference> references) throws Exception
+   {
+      for (MessageReference ref : references)
+      {
+         ref.setScheduledDeliveryTime(scheduledDeliveryTime);
 
+         if (ref.getMessage().isDurable() && ref.getQueue().isDurable())
+         {
+            if (transactionID >= 0)
+            {
+               storageManager.updateScheduledDeliveryTimeTransactional(transactionID, ref);
+            }
+            else
+            {
+               storageManager.updateScheduledDeliveryTime(ref);
+            }
+         }
+      }
+   }
+   
+
    // Private -----------------------------------------------------------------
 
    private Binding createBinding(final SimpleString address,
@@ -536,13 +581,13 @@
       Map<SimpleString, List<Pair<SimpleString, Long>>> duplicateIDMap = new HashMap<SimpleString, List<Pair<SimpleString, Long>>>();
 
       storageManager.loadMessageJournal(this, queues, resourceManager, duplicateIDMap);
-      
-      for (Map.Entry<SimpleString, List<Pair<SimpleString, Long>>> entry: duplicateIDMap.entrySet())
+
+      for (Map.Entry<SimpleString, List<Pair<SimpleString, Long>>> entry : duplicateIDMap.entrySet())
       {
          SimpleString address = entry.getKey();
-         
+
          DuplicateIDCache cache = getDuplicateIDCache(address);
-                           
+
          if (persistIDCache)
          {
             cache.load(entry.getValue());

Modified: trunk/src/main/org/jboss/messaging/core/server/MessageReference.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/MessageReference.java	2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/src/main/org/jboss/messaging/core/server/MessageReference.java	2008-12-12 03:59:01 UTC (rev 5520)
@@ -55,6 +55,7 @@
    
    void setScheduledDeliveryTime(long scheduledDeliveryTime);
    
+   int getMemoryEstimate();
 
    int getDeliveryCount();
    

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java	2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java	2008-12-12 03:59:01 UTC (rev 5520)
@@ -37,6 +37,7 @@
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.core.transaction.Transaction;
 import org.jboss.messaging.core.transaction.impl.TransactionImpl;
+import org.jboss.messaging.util.DataConstants;
 import org.jboss.messaging.util.SimpleString;
 
 import java.util.List;
@@ -62,7 +63,11 @@
    private ServerMessage message;
 
    private Queue queue;
+   
+   // Static --------------------------------------------------------
+   
 
+
    // Constructors --------------------------------------------------
 
    public MessageReferenceImpl(final MessageReferenceImpl other, final Queue queue)
@@ -84,12 +89,19 @@
    }
 
    // MessageReference implementation -------------------------------
-
    public MessageReference copy(final Queue queue)
    {
       return new MessageReferenceImpl(this, queue);
    }
 
+   public int getMemoryEstimate()
+   {
+      // from few tests I have done, deliveryCount and scheduledDelivery will use  two longs (because of alignment)
+      // and each of the references (messages and queue) will use the equivalent to two longs (because of long pointers).
+      // Anyway.. this is just an estimate
+      return DataConstants.SIZE_LONG * 4;
+   }
+   
    public int getDeliveryCount()
    {
       return deliveryCount;

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-12-12 03:59:01 UTC (rev 5520)
@@ -210,7 +210,8 @@
                                                           new JBMThreadFactory("JBM-scheduled-threads"));
       queueFactory = new QueueFactoryImpl(scheduledExecutor, queueSettingsRepository);
 
-      pagingManager = new PagingManagerImpl(new PagingStoreFactoryNIO(configuration.getPagingDirectory()),
+      pagingManager = new PagingManagerImpl(new PagingStoreFactoryNIO(configuration.getPagingDirectory(),
+                                                                      configuration.getPagingMaxThreads()),
                                             storageManager,
                                             queueSettingsRepository,
                                             configuration.getPagingMaxGlobalSizeBytes(),
@@ -295,7 +296,7 @@
 
          clusterManager.start();
       }
-      
+
       serverManagement = managementService.registerServer(postOffice,
                                                           storageManager,
                                                           configuration,
@@ -305,7 +306,6 @@
                                                           remotingService,
                                                           this);
 
-
       log.info("Started messaging server");
 
       started = true;
@@ -456,7 +456,7 @@
    {
       return started;
    }
-   
+
    public ClusterManager getClusterManager()
    {
       return clusterManager;
@@ -620,7 +620,7 @@
       if (backupConnectorFactory != null)
       {
          NoCacheConnectionLifeCycleListener listener = new NoCacheConnectionLifeCycleListener();
-         
+
          RemotingConnectionImpl replicatingConnection = (RemotingConnectionImpl)RemotingConnectionImpl.createConnection(backupConnectorFactory,
                                                                                                                         backupConnectorParams,
                                                                                                                         ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-12-12 03:59:01 UTC (rev 5520)
@@ -27,6 +27,7 @@
 import org.jboss.messaging.core.list.impl.PriorityLinkedListImpl;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.paging.PagingManager;
+import org.jboss.messaging.core.paging.PagingStore;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.server.Consumer;
@@ -901,12 +902,22 @@
       deliveringCount.decrementAndGet();
 
       sizeBytes.addAndGet(-ref.getMessage().getEncodeSize());
+      
+      
+      // TODO: We could optimize this by storing the paging-store for the address on the Queue. We would need to know the Address for the Queue
+      PagingStore store = null;
+      
+      if (pagingManager != null)
+      {
+         store = pagingManager.getPageStore(ref.getMessage().getDestination());
+         store.addSize(-ref.getMemoryEstimate());
+      }
 
       if (ref.getMessage().decrementRefCount() == 0)
       {
-         if (pagingManager != null)
+         if (store != null)
          {
-            pagingManager.messageDone(ref.getMessage());
+            store.addSize(-ref.getMessage().getMemoryEstimate());
          }
       }
    }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-12-12 03:59:01 UTC (rev 5520)
@@ -589,13 +589,15 @@
     */
    private void sendStandardMessage(final MessageReference ref, final ServerMessage message)
    {
+
+      final SessionReceiveMessage packet = new SessionReceiveMessage(id, message, ref.getDeliveryCount() + 1);
+
       if (availableCredits != null)
       {
-         availableCredits.addAndGet(-message.getEncodeSize());
+         // RequiredBufferSize is the actual size for this packet
+         availableCredits.addAndGet(-packet.getRequiredBufferSize());
       }
 
-      final SessionReceiveMessage packet = new SessionReceiveMessage(id, message, ref.getDeliveryCount() + 1);
-
       DelayedResult result = channel.replicatePacket(new SessionReplicateDeliveryMessage(id, message.getMessageID()));
 
       if (result == null)

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-12-12 03:59:01 UTC (rev 5520)
@@ -2651,23 +2651,12 @@
                storageManager.storeMessage(msg);               
             }
 
-            // TODO - this code is also duplicated in transactionimpl and in depaging
-            // it should all be centralised
-
-            for (MessageReference ref : refs)
+            if (scheduledDeliveryTime != null)
             {
-               if (scheduledDeliveryTime != null)
-               {
-                  ref.setScheduledDeliveryTime(scheduledDeliveryTime.longValue());
-
-                  if (ref.getMessage().isDurable() && ref.getQueue().isDurable())
-                  {
-                     storageManager.updateScheduledDeliveryTime(ref);
-                  }
-               }
-
-               ref.getQueue().addLast(ref);
+               postOffice.scheduleReferences(scheduledDeliveryTime, refs);
             }
+            
+            postOffice.deliver(refs);
          }
       }
       else

Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2008-12-12 03:59:01 UTC (rev 5520)
@@ -17,6 +17,7 @@
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Set;
 
 import javax.transaction.xa.Xid;
 
@@ -59,6 +60,12 @@
 
    private final List<MessageReference> acknowledgements = new ArrayList<MessageReference>();
 
+   /** List of destinations in page mode.
+    *  Once a destination was considered in page, it should go toward paging until commit is called, 
+    *  even if the page-mode has changed, or messageOrder won't be respected */
+   private final Set<SimpleString> destinationsInPageMode = new HashSet<SimpleString>(); 
+   
+   // FIXME: As part of https://jira.jboss.org/jira/browse/JBMESSAGING-1313
    private final List<ServerMessage> pagedMessages = new ArrayList<ServerMessage>();
 
    private PageTransactionInfo pageTransaction;
@@ -165,9 +172,12 @@
       {
          throw new IllegalStateException("Transaction is in invalid state " + state);
       }
+      
+      SimpleString destination = message.getDestination();
 
-      if (pagingManager.isPaging(message.getDestination()))
+      if (destinationsInPageMode.contains(destination) || pagingManager.isPaging(destination))
       {
+         destinationsInPageMode.add(destination);
          pagedMessages.add(message);
       }
       else
@@ -310,10 +320,7 @@
             storageManager.commit(id);
          }
 
-         for (MessageReference ref : refsToAdd)
-         {
-            ref.getQueue().addLast(ref);
-         }
+         postOffice.deliver(refsToAdd);
 
          // If part of the transaction goes to the queue, and part goes to paging, we can't let depage start for the
          // transaction until all the messages were added to the queue
@@ -433,6 +440,7 @@
    {
       containsPersistent = true;
       refsToAdd.addAll(messages);
+      
       this.acknowledgements.addAll(acknowledgements);
       this.pageTransaction = pageTransaction;
 
@@ -494,7 +502,22 @@
          }
          toCancel.add(ref);
       }
+      
+      HashSet<ServerMessage> messagesAdded = new HashSet<ServerMessage>();
+      
 
+      // We need to remove the sizes added on paging manager, for the messages that only exist here on the Transaction
+      for (MessageReference ref: this.refsToAdd)
+      {
+         messagesAdded.add(ref.getMessage());
+         pagingManager.getPageStore(ref.getMessage().getDestination()).addSize(-ref.getMemoryEstimate());
+      }
+      
+      for (ServerMessage msg: messagesAdded)
+      {
+         pagingManager.removeSize(msg);
+      }
+
       clear();
 
       return toCancel;
@@ -523,18 +546,10 @@
 
          containsPersistent = true;
       }
-
+      
       if (scheduledDeliveryTime != null)
       {
-         for (MessageReference ref : refs)
-         {
-            ref.setScheduledDeliveryTime(scheduledDeliveryTime);
-
-            if (ref.getMessage().isDurable() && ref.getQueue().isDurable())
-            {
-               storageManager.updateScheduledDeliveryTimeTransactional(id, ref);
-            }
-         }
+         postOffice.scheduleReferences(id, scheduledDeliveryTime, refs);
       }
 
       return refs;

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java	2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java	2008-12-12 03:59:01 UTC (rev 5520)
@@ -140,8 +140,6 @@
          ClientSession session = sf.createSession(null, null, false, true, false, preAck, 0);
 
          session.createQueue(ADDRESS, ADDRESS, null, true, false, true);
-         
-         long initialSize = messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize();
 
          ClientProducer producer = session.createProducer(ADDRESS);
 
@@ -288,7 +286,7 @@
          session.close();
 
          long globalSize = messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize();
-         assertTrue(globalSize == initialSize || globalSize == 0);
+         assertEquals(0l, globalSize);
          assertEquals(0, messagingService.getServer().getPostOffice().getBinding(ADDRESS).getQueue().getDeliveringCount());
          assertEquals(0, messagingService.getServer().getPostOffice().getBinding(ADDRESS).getQueue().getMessageCount());
 

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java	2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java	2008-12-12 03:59:01 UTC (rev 5520)
@@ -66,7 +66,7 @@
       HierarchicalRepository<QueueSettings> queueSettings = new HierarchicalObjectRepository<QueueSettings>();
       queueSettings.setDefault(new QueueSettings());
 
-      PagingManagerImpl managerImpl = new PagingManagerImpl(new PagingStoreFactoryNIO(getPageDir()),
+      PagingManagerImpl managerImpl = new PagingManagerImpl(new PagingStoreFactoryNIO(getPageDir(), 10),
                                                             null,
                                                             queueSettings,
                                                             -1,
@@ -116,7 +116,7 @@
 
       queueSettings.addMatch("simple-test", simpleTestSettings);
 
-      PagingManagerImpl managerImpl = new PagingManagerImpl(new PagingStoreFactoryNIO(getJournalDir()),
+      PagingManagerImpl managerImpl = new PagingManagerImpl(new PagingStoreFactoryNIO(getJournalDir(), 10),
                                                             null,
                                                             queueSettings,
                                                             -1,
@@ -128,22 +128,20 @@
 
       ServerMessage msg = createMessage(1l, new SimpleString("simple-test"), createRandomBuffer(100));
 
-      long currentSize = managerImpl.addSize(msg);
+      assertTrue(managerImpl.addSize(msg));
 
-      assertTrue(currentSize > 0);
-
-      assertEquals(currentSize, managerImpl.getPageStore(new SimpleString("simple-test")).getAddressSize());
-
       for (int i = 0; i < 10; i++)
       {
-         assertTrue(managerImpl.addSize(msg) < 0);
+         long currentSize = managerImpl.getPageStore(new SimpleString("simple-test")).getAddressSize();
+         assertFalse(managerImpl.addSize(msg));
 
+         // should be unchanged
          assertEquals(currentSize, managerImpl.getPageStore(new SimpleString("simple-test")).getAddressSize());
       }
 
       managerImpl.messageDone(msg);
 
-      assertTrue(managerImpl.addSize(msg) > 0);
+      assertTrue(managerImpl.addSize(msg));
 
       managerImpl.stop();
    }

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingServiceIntegrationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingServiceIntegrationTest.java	2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingServiceIntegrationTest.java	2008-12-12 03:59:01 UTC (rev 5520)
@@ -34,6 +34,7 @@
 import org.jboss.messaging.core.client.ClientSessionFactory;
 import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.message.impl.MessageImpl;
 import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.core.server.MessagingService;
@@ -141,6 +142,10 @@
 
          sf = createInVMFactory();
 
+         System.out.println("Size = " + messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize());
+         
+         assertTrue(messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize() > 0);
+
          session = sf.createSession(null, null, false, true, true, false, 0);
 
          ClientConsumer consumer = session.createConsumer(ADDRESS);
@@ -174,6 +179,9 @@
          consumer.close();
 
          session.close();
+
+         assertEquals(0, messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize());
+
       }
       finally
       {
@@ -188,6 +196,467 @@
 
    }
 
+   
+   /**
+    * - Make a destination in page mode
+    * - Add stuff to a transaction
+    * - Consume the entire destination (not in page mode any more)
+    * - Add stuff to a transaction again
+    * - Check order
+    * 
+    */
+   public void testDepageDuringTransaction() throws Exception
+   {
+      clearData();
+
+      Configuration config = createDefaultConfig();
+
+      config.setPagingMaxGlobalSizeBytes(100 * 1024);
+      config.setPagingDefaultSize(10 * 1024);
+
+      MessagingService messagingService = createService(true, config, new HashMap<String, QueueSettings>());
+
+      messagingService.start();
+
+      final int numberOfIntegers = 256;
+
+      try
+      {
+         ClientSessionFactory sf = createInVMFactory();
+
+         sf.setBlockOnNonPersistentSend(true);
+         sf.setBlockOnPersistentSend(true);
+         sf.setBlockOnAcknowledge(true);
+
+         ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
+
+         session.createQueue(ADDRESS, ADDRESS, null, true, false, true);
+
+         ClientProducer producer = session.createProducer(ADDRESS);
+
+         ByteBuffer ioBuffer = ByteBuffer.allocate(DataConstants.SIZE_INT * numberOfIntegers);
+         MessagingBuffer bodyLocal = new ByteBufferWrapper(ioBuffer);
+
+         ClientMessage message = null;
+
+         int numberOfMessages = 0;
+         while (true)
+         {
+            message = session.createClientMessage(true);
+            message.setBody(bodyLocal);
+            
+            // Stop sending message as soon as we start paging
+            if (messagingService.getServer().getPostOffice().getPagingManager().isPaging(ADDRESS))
+            {
+               break;
+            }
+            numberOfMessages ++;
+            
+
+            producer.send(message);
+         }
+         
+
+         
+         session.start();
+         
+         ClientSession sessionTransacted = sf.createSession(null, null, false, false, false, false, 0);
+
+         ClientProducer producerTransacted = sessionTransacted.createProducer(ADDRESS);
+         
+         for (int i = 0; i< 10; i++)
+         {
+            message = session.createClientMessage(true);
+            message.setBody(bodyLocal);
+            message.putIntProperty(new SimpleString("id"), i);
+            
+            // Consume messages to force an eventual out of order delivery
+            if (i == 5)
+            {
+               ClientConsumer consumer = session.createConsumer(ADDRESS);
+               for (int j = 0; j < numberOfMessages; j++)
+               {
+                  ClientMessage msg = consumer.receive(1000);
+                  msg.acknowledge();
+                  assertNotNull(msg);
+               }
+               
+               
+               assertNull(consumer.receive(100));
+               consumer.close();
+            }
+            
+            Integer messageID = (Integer) message.getProperty(new SimpleString("id"));
+            assertNotNull(messageID);
+            assertEquals(messageID.intValue(), i);
+
+            producerTransacted.send(message);
+         }
+
+         ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+         assertNull(consumer.receive(100));
+         
+         sessionTransacted.commit();
+         
+         sessionTransacted.close();
+
+         for (int i = 0; i < 10; i++)
+         {
+            message = consumer.receive(10000);
+
+            assertNotNull(message);
+            
+            Integer messageID = (Integer) message.getProperty(new SimpleString("id"));
+            
+            assertNotNull(messageID);
+            assertEquals("message received out of order", messageID.intValue(), i);
+
+            message.acknowledge();
+         }
+         
+         assertNull(consumer.receive(100));
+
+         consumer.close();
+
+         session.close();
+
+         assertEquals(0, messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize());
+
+      }
+      finally
+      {
+         try
+         {
+            messagingService.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+
+   }
+
+
+   
+   
+   public void testPageOnSchedulingNoRestart() throws Exception
+   {
+      internalTestPageOnScheduling(false);
+   }
+   
+
+
+   public void testPageOnSchedulingRestart() throws Exception
+   {
+      internalTestPageOnScheduling(true);
+   }
+   
+
+   public void internalTestPageOnScheduling(final boolean restart) throws Exception
+   {
+      clearData();
+
+      Configuration config = createDefaultConfig();
+
+      config.setPagingMaxGlobalSizeBytes(100 * 1024);
+      config.setPagingDefaultSize(10 * 1024);
+
+      MessagingService messagingService = createService(true, config, new HashMap<String, QueueSettings>());
+
+      messagingService.start();
+
+      final int numberOfIntegers = 256;
+
+      final int numberOfMessages = 10000;
+
+      try
+      {
+         ClientSessionFactory sf = createInVMFactory();
+
+         sf.setBlockOnNonPersistentSend(true);
+         sf.setBlockOnPersistentSend(true);
+         sf.setBlockOnAcknowledge(true);
+
+         ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
+
+         session.createQueue(ADDRESS, ADDRESS, null, true, false, true);
+
+         ClientProducer producer = session.createProducer(ADDRESS);
+
+         ByteBuffer ioBuffer = ByteBuffer.allocate(DataConstants.SIZE_INT * numberOfIntegers);
+
+         ClientMessage message = null;
+
+         MessagingBuffer body = null;
+         
+         long scheduledTime = System.currentTimeMillis() + 5000;
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            MessagingBuffer bodyLocal = new ByteBufferWrapper(ioBuffer);
+
+            for (int j = 1; j <= numberOfIntegers; j++)
+            {
+               bodyLocal.putInt(j);
+            }
+            bodyLocal.flip();
+
+            if (i == 0)
+            {
+               body = bodyLocal;
+            }
+            message = session.createClientMessage(true);
+            message.setBody(bodyLocal);
+            message.putIntProperty(new SimpleString("id"), i);
+            
+            // Worse scenario possible... only schedule what's on pages
+            if (messagingService.getServer().getPostOffice().getPagingManager().isPaging(ADDRESS))
+            {
+               message.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, scheduledTime);
+            }
+
+
+            producer.send(message);
+         }
+
+         if (restart)
+         {
+            session.close();
+   
+            messagingService.stop();
+   
+            messagingService = createService(true, config, new HashMap<String, QueueSettings>());
+            messagingService.start();
+   
+            sf = createInVMFactory();
+
+            session = sf.createSession(null, null, false, true, true, false, 0);
+         }
+
+         ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+         session.start();
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            ClientMessage message2 = consumer.receive(10000);
+
+            assertNotNull(message2);
+
+            message2.acknowledge();
+
+            assertNotNull(message2);
+            
+            Long scheduled = (Long)message2.getProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME);
+            if (scheduled != null)
+            {
+               assertTrue("Scheduling didn't work", System.currentTimeMillis() >= scheduledTime);
+            }
+
+            try
+            {
+               assertEqualsByteArrays(body.limit(), body.array(), message2.getBody().array());
+            }
+            catch (AssertionFailedError e)
+            {
+               log.info("Expected buffer:" + dumbBytesHex(body.array(), 40));
+               log.info("Arriving buffer:" + dumbBytesHex(message2.getBody().array(), 40));
+               throw e;
+            }
+         }
+
+         consumer.close();
+
+         session.close();
+
+         assertEquals(0, messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize());
+
+      }
+      finally
+      {
+         try
+         {
+            messagingService.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+
+   }
+
+
+   public void testRollbackOnSend() throws Exception
+   {
+      clearData();
+
+      Configuration config = createDefaultConfig();
+
+      config.setPagingMaxGlobalSizeBytes(100 * 1024);
+      config.setPagingDefaultSize(10 * 1024);
+
+      MessagingService messagingService = createService(true, config, new HashMap<String, QueueSettings>());
+
+      messagingService.start();
+
+      final int numberOfIntegers = 256;
+
+      final int numberOfMessages = 10;
+
+      try
+      {
+         ClientSessionFactory sf = createInVMFactory();
+
+         sf.setBlockOnNonPersistentSend(true);
+         sf.setBlockOnPersistentSend(true);
+         sf.setBlockOnAcknowledge(true);
+
+         ClientSession session = sf.createSession(null, null, false, false, true, false, 0);
+
+         session.createQueue(ADDRESS, ADDRESS, null, true, false, true);
+
+         ClientProducer producer = session.createProducer(ADDRESS);
+
+         
+         long initialSize = messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize();
+         
+         ByteBuffer ioBuffer = ByteBuffer.allocate(DataConstants.SIZE_INT * numberOfIntegers);
+
+         ClientMessage message = null;
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            MessagingBuffer bodyLocal = new ByteBufferWrapper(ioBuffer);
+
+            for (int j = 1; j <= numberOfIntegers; j++)
+            {
+               bodyLocal.putInt(j);
+            }
+            bodyLocal.flip();
+
+            message = session.createClientMessage(true);
+            message.setBody(bodyLocal);
+            message.putIntProperty(new SimpleString("id"), i);
+
+            producer.send(message);
+         }
+         
+         session.rollback();
+         
+
+         ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+         session.start();
+         
+         assertNull(consumer.receive(500));
+         
+         session.close();
+         
+         assertEquals(initialSize, messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize());
+      }
+      finally
+      {
+         try
+         {
+            messagingService.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+
+   }
+
+
+   
+   public void testCommitOnSend() throws Exception
+   {
+      clearData();
+
+      Configuration config = createDefaultConfig();
+
+      config.setPagingMaxGlobalSizeBytes(100 * 1024);
+      config.setPagingDefaultSize(10 * 1024);
+
+      MessagingService messagingService = createService(true, config, new HashMap<String, QueueSettings>());
+
+      messagingService.start();
+
+      final int numberOfIntegers = 10;
+
+      final int numberOfMessages = 10;
+
+      try
+      {
+         ClientSessionFactory sf = createInVMFactory();
+
+         sf.setBlockOnNonPersistentSend(true);
+         sf.setBlockOnPersistentSend(true);
+         sf.setBlockOnAcknowledge(true);
+
+         ClientSession session = sf.createSession(null, null, false, false, false, false, 0);
+
+         session.createQueue(ADDRESS, ADDRESS, null, true, false, true);
+
+         ClientProducer producer = session.createProducer(ADDRESS);
+
+         
+         long initialSize = messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize();
+         
+         ByteBuffer ioBuffer = ByteBuffer.allocate(DataConstants.SIZE_INT * numberOfIntegers);
+
+         ClientMessage message = null;
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            MessagingBuffer bodyLocal = new ByteBufferWrapper(ioBuffer);
+
+            for (int j = 1; j <= numberOfIntegers; j++)
+            {
+               bodyLocal.putInt(j);
+            }
+            bodyLocal.flip();
+
+            message = session.createClientMessage(true);
+            message.setBody(bodyLocal);
+            message.putIntProperty(new SimpleString("id"), i);
+
+            producer.send(message);
+         }
+         
+         session.commit();
+
+         ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+         session.start();
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            ClientMessage msg = consumer.receive(500);
+            assertNotNull(msg);
+            msg.acknowledge();
+         }
+         
+         
+         session.commit();
+         
+         session.close();
+         
+         assertEquals(initialSize, messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize());
+      }
+      finally
+      {
+         try
+         {
+            messagingService.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/queue/ExpiryAddressTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/queue/ExpiryAddressTest.java	2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/queue/ExpiryAddressTest.java	2008-12-12 03:59:01 UTC (rev 5520)
@@ -57,14 +57,17 @@
       messagingService.getServer().getQueueSettingsRepository().addMatch(qName.toString(), queueSettings);
       clientSession.createQueue(ea, eq, null, false, false, false);
       clientSession.createQueue(qName, qName, null, false, false, false);
+      
       ClientProducer producer = clientSession.createProducer(qName);
       ClientMessage clientMessage = createTextMessage("heyho!", clientSession);
       clientMessage.setExpiration(System.currentTimeMillis());
       producer.send(clientMessage);
+      
       clientSession.start();
       ClientConsumer clientConsumer = clientSession.createConsumer(qName);
       ClientMessage m = clientConsumer.receive(500);
       assertNull(m);
+      System.out.println("size3 = " + messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize());
       m = clientConsumer.receive(500);
       assertNull(m);
       clientConsumer.close();
@@ -72,6 +75,10 @@
       m = clientConsumer.receive(500);
       assertNotNull(m);
       assertEquals(m.getBody().getString(), "heyho!");
+      m.acknowledge();
+      
+      // PageSize should be the same as when it started
+      assertEquals(0, messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize());
    }
 
    public void testBasicSendToMultipleQueues() throws Exception
@@ -89,24 +96,49 @@
       ClientProducer producer = clientSession.createProducer(qName);
       ClientMessage clientMessage = createTextMessage("heyho!", clientSession);
       clientMessage.setExpiration(System.currentTimeMillis());
+      
+      System.out.println("initialPageSize = " + messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize());
+      
       producer.send(clientMessage);
+      
+      System.out.println("pageSize after message sent = " + messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize());
+      
       clientSession.start();
       ClientConsumer clientConsumer = clientSession.createConsumer(qName);
       ClientMessage m = clientConsumer.receive(500);
+      
+      System.out.println("pageSize after message received = " + messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize());
+      
       assertNull(m);
+      
       clientConsumer.close();
+      
       clientConsumer = clientSession.createConsumer(eq);
+      
       m = clientConsumer.receive(500);
+      
       assertNotNull(m);
+      
       m.acknowledge();
+      
       assertEquals(m.getBody().getString(), "heyho!");
+      
       clientConsumer.close();
+      
       clientConsumer = clientSession.createConsumer(eq2);
+      
       m = clientConsumer.receive(500);
+      
       assertNotNull(m);
+      
       m.acknowledge();
+      
       assertEquals(m.getBody().getString(), "heyho!");
+      
       clientConsumer.close();
+
+      // PageGlobalSize should be untouched as the message expired
+      assertEquals(0, messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize());
    }
 
    public void testBasicSendToNoQueue() throws Exception
@@ -188,7 +220,8 @@
       messagingService.start();
       // then we create a client as normal
       ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
-      clientSession = sessionFactory.createSession(true, true, false);
+      sessionFactory.setBlockOnAcknowledge(true); // There are assertions over sizes that needs to be done after the ACK was received on server
+      clientSession = sessionFactory.createSession(null, null, false, true, true, false, 0);
    }
 
    @Override

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaRecoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaRecoveryTest.java	2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaRecoveryTest.java	2008-12-12 03:59:01 UTC (rev 5520)
@@ -328,6 +328,8 @@
       addSettings();
 
       clientSession.createQueue(pageQueue, pageQueue, null, true, true, true);
+      
+      long initialPageSize = this.messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize();
 
       clientSession.start(xid, XAResource.TMNOFLAGS);
 
@@ -364,6 +366,10 @@
       ClientConsumer pageConsumer = clientSession.createConsumer(pageQueue);
 
       assertNull(pageConsumer.receive(100));
+      
+      long globalSize = this.messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize();
+      // Management message (from createQueue) will not be taken into account again as it is nonPersistent
+      assertTrue(globalSize == initialPageSize || globalSize == 0l);
 
    }
 

Modified: trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java	2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java	2008-12-12 03:59:01 UTC (rev 5520)
@@ -32,6 +32,7 @@
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.DuplicateIDCache;
 import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.QueueFactory;
 import org.jboss.messaging.core.server.ServerMessage;
@@ -156,4 +157,26 @@
       return null;
    }
 
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.core.postoffice.PostOffice#deliver(java.util.List)
+    */
+   public void deliver(List<MessageReference> references)
+   {
+      
+   }
+
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.core.postoffice.PostOffice#scheduleReferences(long, java.util.List)
+    */
+   public void scheduleReferences(long scheduledDeliveryTime, List<MessageReference> references) throws Exception
+   {
+   }
+
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.core.postoffice.PostOffice#scheduleReferences(long, long, java.util.List)
+    */
+   public void scheduleReferences(long transactionID, long scheduledDeliveryTime, List<MessageReference> references) throws Exception
+   {
+   }
+
 }

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/MessageReferenceImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/MessageReferenceImplTest.java	2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/MessageReferenceImplTest.java	2008-12-12 03:59:01 UTC (rev 5520)
@@ -181,6 +181,8 @@
       
       EasyMock.expect(po.route(serverMessage)).andReturn(new ArrayList<MessageReference>());
       
+      po.deliver((List<MessageReference>)EasyMock.anyObject());
+      
       EasyMock.expect(serverMessage.getDurableRefCount()).andReturn(0);
       EasyMock.expect(serverMessage.decrementDurableRefCount()).andReturn(0);
       
@@ -227,6 +229,8 @@
       EasyMock.expect(queue.isDurable()).andStubReturn(true);
       EasyMock.expect(serverMessage.decrementDurableRefCount()).andReturn(0);
       EasyMock.expect(sm.generateUniqueID()).andReturn(1l);
+      
+      po.deliver((List<MessageReference>)EasyMock.anyObject());
 
       EasyMock.replay(sm, po, repos, serverMessage, queue, pm);
       messageReference.expire(sm, po, repos);
@@ -282,6 +286,8 @@
       EasyMock.expect(po.route(serverMessage)).andReturn(new ArrayList<MessageReference>());
       EasyMock.expect(serverMessage.getDurableRefCount()).andReturn(0);
       EasyMock.expect(serverMessage.decrementDurableRefCount()).andReturn(0);
+      
+      po.deliver((List<MessageReference>)EasyMock.anyObject());
 
       EasyMock.replay(sm, po, repos, serverMessage, queue, expQBinding, pm);
       
@@ -334,6 +340,8 @@
       EasyMock.expect(serverMessage.isDurable()).andStubReturn(false);
       EasyMock.expect(serverMessage.getMessageID()).andReturn(messageID);
       queue.referenceAcknowledged(messageReference);
+      
+      postOffice.deliver((List<MessageReference>)EasyMock.anyObject());
 
       EasyMock.replay(queue, toBinding, toQueue, postOffice, persistenceManager, serverMessage, copyMessage, pm);
       

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java	2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java	2008-12-12 03:59:01 UTC (rev 5520)
@@ -42,6 +42,7 @@
 import org.easymock.EasyMock;
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.paging.PagingManager;
+import org.jboss.messaging.core.paging.PagingStore;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.PostOffice;
@@ -1259,7 +1260,7 @@
       storageManager.deleteMessageTransactional(EasyMock.anyLong(), EasyMock.eq(queue.getPersistenceID()), EasyMock.eq(messageID));
       storageManager.commit(EasyMock.anyLong());
 
-      PostOffice postOffice = createMock(PostOffice.class);
+      PostOffice postOffice = EasyMock.createNiceMock(PostOffice.class);
       PagingManager pm = EasyMock.createNiceMock(PagingManager.class);
       EasyMock.expect(pm.page(EasyMock.isA(ServerMessage.class))).andStubReturn(false);
       EasyMock.expect(postOffice.getPagingManager()).andStubReturn(pm);
@@ -1319,7 +1320,7 @@
       storageManager.deleteMessageTransactional(anyLong(), eq(queue.getPersistenceID()), eq(messageID));
       storageManager.commit(anyLong());
       
-      PostOffice postOffice = createMock(PostOffice.class);
+      PostOffice postOffice = EasyMock.createNiceMock(PostOffice.class);
       PagingManager pm = EasyMock.createNiceMock(PagingManager.class);
       EasyMock.expect(pm.page(EasyMock.isA(ServerMessage.class))).andStubReturn(false);
       EasyMock.expect(postOffice.getPagingManager()).andStubReturn(pm);
@@ -1382,7 +1383,7 @@
       storageManager.deleteMessageTransactional(EasyMock.anyLong(), EasyMock.eq(queue.getPersistenceID()), EasyMock.eq(messageID));
       storageManager.commit(EasyMock.anyLong());
       
-      PostOffice postOffice = EasyMock.createMock(PostOffice.class);      
+      PostOffice postOffice = EasyMock.createNiceMock(PostOffice.class);      
 
       PagingManager pm = EasyMock.createNiceMock(PagingManager.class);
       EasyMock.expect(pm.page(EasyMock.isA(ServerMessage.class))).andStubReturn(false);
@@ -1422,12 +1423,14 @@
    /**
     * @return
     */
-   private PostOffice createMockPostOffice()
+   private PostOffice createMockPostOffice() throws Exception
    {
+      PagingStore niceStore = EasyMock.createNiceMock(PagingStore.class);
       PagingManager niceManager = EasyMock.createNiceMock(PagingManager.class);
       PostOffice nicePostOffice = EasyMock.createNiceMock(PostOffice.class);
       EasyMock.expect(nicePostOffice.getPagingManager()).andStubReturn(niceManager);
-      EasyMock.replay(niceManager, nicePostOffice);
+      EasyMock.expect(niceManager.getPageStore((SimpleString)EasyMock.anyObject())).andStubReturn(niceStore);
+      EasyMock.replay(niceManager, nicePostOffice, niceStore);
       return nicePostOffice;
    }
 

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/transaction/impl/TransactionImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/transaction/impl/TransactionImplTest.java	2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/transaction/impl/TransactionImplTest.java	2008-12-12 03:59:01 UTC (rev 5520)
@@ -24,6 +24,7 @@
 
 import static org.jboss.messaging.tests.util.RandomUtil.randomXid;
 
+import java.util.List;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -534,8 +535,8 @@
    {
       
       PagingManager pagingManager = EasyMock.createStrictMock(PagingManager.class);
-      PostOffice postOffice = EasyMock.createMock(PostOffice.class);
-      PagingStore pagingStore = EasyMock.createStrictMock(PagingStore.class);
+      PostOffice postOffice = EasyMock.createNiceMock(PostOffice.class);
+      PagingStore pagingStore = EasyMock.createNiceMock(PagingStore.class);
       
       EasyMock.expect(pagingManager.getPageStore((SimpleString)EasyMock.anyObject())).andStubReturn(pagingStore);
       EasyMock.expect(postOffice.getPagingManager()).andStubReturn(pagingManager);
@@ -572,8 +573,6 @@
          
       StorageManager sm = EasyMock.createStrictMock(StorageManager.class);
       
-      PostOffice po= EasyMock.createStrictMock(PostOffice.class);
-      
       final long txID = 123;
       
       EasyMock.expect(sm.generateUniqueID()).andReturn(txID);
@@ -582,7 +581,7 @@
       
       EasyMock.replay(sm, postOffice, pagingManager, pagingStore);
             
-      Transaction tx = new TransactionImpl(sm, po);
+      Transaction tx = new TransactionImpl(sm, postOffice);
       
       assertFalse(tx.isContainsPersistent());
             
@@ -632,12 +631,15 @@
       
       //Expect:
       
-      sm.commit(txID);
+      postOffice.deliver((List<MessageReference>)EasyMock.anyObject());
       
-      pagingManager.messageDone(message1);
+      EasyMock.expectLastCall().anyTimes();
       
-      pagingManager.messageDone(message2);
-      
+      sm.commit(txID);
+
+      EasyMock.expect(pagingManager.getPageStore((SimpleString)EasyMock.anyObject())).andStubReturn(pagingStore);
+      EasyMock.expect(postOffice.getPagingManager()).andStubReturn(pagingManager);
+ 
       EasyMock.replay(sm, postOffice, pagingManager, pagingStore);
       
       tx.commit();
@@ -651,22 +653,30 @@
    
    // Private -------------------------------------------------------------------------
    
-   private Transaction createTransaction()
+   private Transaction createTransaction() throws Exception
    {
    	StorageManager sm = EasyMock.createStrictMock(StorageManager.class);
       
-      PostOffice po = EasyMock.createStrictMock(PostOffice.class);
+      PostOffice po = EasyMock.createNiceMock(PostOffice.class);
       
       final long txID = 123L;
       
       EasyMock.expect(sm.generateUniqueID()).andReturn(txID);
    	
-      EasyMock.replay(sm);
+      EasyMock.replay(sm, po);
       
       Transaction tx = new TransactionImpl(sm, po);
       
-      EasyMock.verify(sm);
+      EasyMock.verify(sm, po);
       
+      EasyMock.reset(po);
+      
+      po.deliver((List<MessageReference>)EasyMock.anyObject());
+      
+      EasyMock.expectLastCall().anyTimes();
+      
+      EasyMock.replay(po);
+      
       return tx;
    }
    




More information about the jboss-cvs-commits mailing list