[jboss-cvs] JBoss Messaging SVN: r4884 - in branches/Branch_JBMESSAGING-1314: src/main/org/jboss/messaging/core/paging and 6 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Aug 27 23:50:18 EDT 2008


Author: clebert.suconic at jboss.com
Date: 2008-08-27 23:50:18 -0400 (Wed, 27 Aug 2008)
New Revision: 4884

Modified:
   branches/Branch_JBMESSAGING-1314/src/config/queues.xml
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/LastPageRecord.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Pager.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/fakes/FakeManagerFactory.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
Log:
Few Tweaks, docs and making using the configuration on queue-max-size

Modified: branches/Branch_JBMESSAGING-1314/src/config/queues.xml
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/config/queues.xml	2008-08-27 16:36:27 UTC (rev 4883)
+++ branches/Branch_JBMESSAGING-1314/src/config/queues.xml	2008-08-28 03:50:18 UTC (rev 4884)
@@ -93,6 +93,10 @@
       <clustered>false</clustered>
    </queue-settings>
 
+   <queue-settings match="queuejms.MyQueue">
+      <max-size-bytes>104857600</max-size-bytes>
+   </queue-settings>
+
    <!--default for catch all-->
    <queue-settings match="*">
       <clustered>false</clustered>

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/LastPageRecord.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/LastPageRecord.java	2008-08-27 16:36:27 UTC (rev 4883)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/LastPageRecord.java	2008-08-28 03:50:18 UTC (rev 4884)
@@ -28,7 +28,7 @@
 
 /**
  * 
- * Stores the last pageID processed during depage, to detect duplications during the delete
+ * Stores the last pageID processed during depage, to detect duplications after the delete
  * 
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
@@ -46,10 +46,10 @@
    
    // Public --------------------------------------------------------
    
-   /** Internal record with the primary key, used on the journal/database*/
+   /** Internal field with the primary key, used on the journal/database */
    long getRecordId();
 
-   /** Internal record with the primary key, used on the journal/database*/
+   /** Internal field with the primary key, used on the journal/database */
    void setRecordId(long recordId);
 
    SimpleString getDestination();

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Pager.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Pager.java	2008-08-27 16:36:27 UTC (rev 4883)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Pager.java	2008-08-28 03:50:18 UTC (rev 4884)
@@ -97,7 +97,7 @@
     * Or else the record could live forever on the journal. 
     * @throws Exception 
     * */
-   void clearLastRecord(LastPageRecord lastRecord) throws Exception;
+   void clearLastPageRecord(LastPageRecord lastRecord) throws Exception;
    
    
 }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java	2008-08-27 16:36:27 UTC (rev 4883)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java	2008-08-28 03:50:18 UTC (rev 4884)
@@ -44,6 +44,13 @@
    
    SimpleString getStoreName();
    
+   /** Maximum number of bytes allowed in memory */ 
+   long getMaxSizeBytes();
+   
+   long getQueueSize();
+   
+   long addQueueSize(long add);
+   
    /** @return true if paging was started, or false if paging was already started before this call */
    boolean startPaging() throws Exception;
    

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java	2008-08-27 16:36:27 UTC (rev 4883)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java	2008-08-28 03:50:18 UTC (rev 4884)
@@ -23,8 +23,10 @@
 
 package org.jboss.messaging.core.paging;
 
+import org.jboss.messaging.core.settings.impl.QueueSettings;
 
 
+
 /**
  * The integration point between the PagingManger and the File System (aka SequentialFiles)
  * 
@@ -34,6 +36,6 @@
 public interface PagingStoreFactory
 {
 
-   PagingStore newStore(org.jboss.messaging.util.SimpleString destinationName);
+   PagingStore newStore(org.jboss.messaging.util.SimpleString destinationName, QueueSettings queueSettings);
    
 }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java	2008-08-27 16:36:27 UTC (rev 4883)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java	2008-08-28 03:50:18 UTC (rev 4884)
@@ -60,7 +60,6 @@
    // Attributes ----------------------------------------------------
    
    private final int pageId;
-   private final PagingStoreFactory storeFactory;
    private final AtomicInteger numberOfMessages = new AtomicInteger(0);
    private final SequentialFile file;
    private final SequentialFileFactory fileFactory;
@@ -71,12 +70,11 @@
    
    // Constructors --------------------------------------------------
    
-   public PageImpl(final SequentialFileFactory factory, final SequentialFile file, PagingStoreFactory storeFactory, final int pageId) throws Exception
+   public PageImpl(final SequentialFileFactory factory, final SequentialFile file,final int pageId) throws Exception
    {
       this.pageId = pageId;
       this.file = file;
       this.fileFactory = factory;
-      this.storeFactory = storeFactory;
       if (factory.isSupportsCallbacks())
       {
          callback = new PagingCallback();

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java	2008-08-27 16:36:27 UTC (rev 4883)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java	2008-08-28 03:50:18 UTC (rev 4884)
@@ -29,6 +29,7 @@
 import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
 import org.jboss.messaging.core.paging.PagingStore;
 import org.jboss.messaging.core.paging.PagingStoreFactory;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.util.SimpleString;
 
 /**
@@ -59,13 +60,13 @@
    
    // Public --------------------------------------------------------
 
-   public PagingStore newStore(final SimpleString destinationName)
+   public PagingStore newStore(final SimpleString destinationName, QueueSettings settings)
    {
       final String destinationDirectory = directory + "/" + destinationName.toString();
       File destinationFile = new File(destinationDirectory);
       destinationFile.mkdirs();
       
-      return new PagingStoreImpl(newFileFactory(destinationDirectory), this, destinationName, pageSize);
+      return new PagingStoreImpl(newFileFactory(destinationDirectory), destinationName, pageSize, settings);
    }
 
    // Package protected ---------------------------------------------

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2008-08-27 16:36:27 UTC (rev 4883)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2008-08-28 03:50:18 UTC (rev 4884)
@@ -29,6 +29,8 @@
 import org.jboss.messaging.core.paging.PagingManager;
 import org.jboss.messaging.core.paging.PagingStore;
 import org.jboss.messaging.core.paging.PagingStoreFactory;
+import org.jboss.messaging.core.settings.HierarchicalRepository;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.util.SimpleString;
 
 /**
@@ -46,15 +48,18 @@
    
    private final ConcurrentMap<SimpleString, PagingStore> stores = new ConcurrentHashMap<SimpleString, PagingStore>();
    
+   private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
+   
    private final PagingStoreFactory pagingSPI;
    
    // Static --------------------------------------------------------
    
    // Constructors --------------------------------------------------
    
-   public PagingManagerImpl(final PagingStoreFactory pagingSPI)
+   public PagingManagerImpl(final PagingStoreFactory pagingSPI, final HierarchicalRepository<QueueSettings> queueSettingsRepository)
    {
       this.pagingSPI = pagingSPI;
+      this.queueSettingsRepository = queueSettingsRepository;
    }
    
    // Public --------------------------------------------------------
@@ -113,7 +118,7 @@
    
    private PagingStore newStore(final SimpleString destinationName)
    {
-      return pagingSPI.newStore(destinationName);
+      return pagingSPI.newStore(destinationName, this.queueSettingsRepository.getMatch(destinationName.toString()));
    }
    
    private void validateStarted()

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2008-08-27 16:36:27 UTC (rev 4883)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2008-08-28 03:50:18 UTC (rev 4884)
@@ -27,6 +27,7 @@
 import java.util.List;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -37,7 +38,7 @@
 import org.jboss.messaging.core.paging.Page;
 import org.jboss.messaging.core.paging.PageMessage;
 import org.jboss.messaging.core.paging.Pager;
-import org.jboss.messaging.core.paging.PagingStoreFactory;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.util.SimpleString;
 
 /**
@@ -59,28 +60,28 @@
    
    private final SimpleString storeName;
    
-   private final PagingStoreFactory storeFactory;
-   
    private final SequentialFileFactory fileFactory;
    
    private final long maxPageSize;
    
-   private volatile Thread dequeueThread;
+   private final QueueSettings queueSettings;
    
+   // Bytes consumed by the queue on the memory
+   private final AtomicLong sizeInBytes = new AtomicLong();
    
+   private volatile Thread dequeueThread;
    private volatile int numberOfPages;
    private volatile int firstPageId = Integer.MAX_VALUE;
    private volatile int currentPageId;
    private volatile Page currentPage;
 
-   // This is supposed to perform better than synchronized methods
-   // synchronizedBlockLock protects opening/closing and messing up with IDs
-   private final Semaphore synchronizedBlockLock = new Semaphore(1); 
+   // positioningGlobalLock protects opening/closing and messing up with positions (currentPage and IDs)
+   private final Semaphore positioningGlobalLock = new Semaphore(1); 
 
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private volatile boolean initialized = false;
 
-   private volatile LastPageRecord lastRecord;
+   private volatile LastPageRecord lastPageRecord;
    
    
    // Static --------------------------------------------------------
@@ -88,12 +89,12 @@
    // Constructors --------------------------------------------------
    
    
-   public PagingStoreImpl(final SequentialFileFactory fileFactory, final PagingStoreFactory storeFactory, final SimpleString storeName, final long maxPageSize) 
+   public PagingStoreImpl(final SequentialFileFactory fileFactory, final SimpleString storeName, final long maxPageSize, QueueSettings queueSettings) 
    {
-      this.storeFactory = storeFactory;
       this.fileFactory = fileFactory;
       this.storeName = storeName;
       this.maxPageSize = maxPageSize;
+      this.queueSettings = queueSettings;
    }
    
    
@@ -101,7 +102,23 @@
 
    // PagingStore implementation ------------------------------------
    
+   public long getQueueSize()
+   {
+      return sizeInBytes.get();
+   }
    
+   public long addQueueSize(long delta)
+   {
+      return sizeInBytes.addAndGet(delta);
+   }
+
+   /** Maximum number of bytes allowed in memory */ 
+   public long getMaxSizeBytes()
+   {
+      return queueSettings.getMaxSizeBytes();
+   }
+
+   
    public boolean isPaging()
    {
       lock.readLock().lock();
@@ -134,10 +151,8 @@
    {
       validateInit();
       
-      // Read needs both global and writeLock
-      synchronizedBlockLock.acquire(); // This is a replacement synchronized.
-                            // Can't change any IDs while depaging.
-      lock.writeLock().lock();  // Wait pending writes to finish before depage.
+      positioningGlobalLock.acquire(); // Can't change currentPage or any of ids without a global lock
+      lock.writeLock().lock();  // Wait pending writes to finish before entering the block
       
       try
       {
@@ -194,7 +209,7 @@
       finally
       {
          lock.writeLock().unlock();
-         synchronizedBlockLock.release();
+         positioningGlobalLock.release();
       }
       
    }
@@ -206,11 +221,10 @@
       int bytesToWrite = fileFactory.calculateBlockSize(message.getEncodeSize() + PageImpl.SIZE_RECORD);
       
       
-      // This would be a synchronized block... (but using a Semaphore)
-      synchronizedBlockLock.acquire();
+      // The only thing single-threaded done on paging is positioning and check-files (verifying if we need to open a new page file)
+      positioningGlobalLock.acquire();
 
-      // The only thing single-threaded done on paging is positioning and check-files (verifying if we need to open a new page file)
-      // After we have it allocated we keep all the threads working until we need to move to a new file (in which case we demand a writeLock, to wait for the writes to finish)
+      // After we have it locked we keep all the threads working until we need to move to a new file (in which case we demand a writeLock, to wait for the writes to finish)
       try
       {
          if (currentPage == null)
@@ -220,7 +234,7 @@
          
          if ((pageUsedSize.addAndGet(bytesToWrite) > maxPageSize && currentPage.getNumberOfMessages() > 0))
          {
-            // Wait any pending transaction on the current page to finish before we can open another page.
+            // Wait any pending write on the current page to finish before we can open another page.
             lock.writeLock().lock();
             try
             {
@@ -241,7 +255,7 @@
       }
       finally
       {
-         synchronizedBlockLock.release();
+         positioningGlobalLock.release();
       }
       
       // End of a synchronized block..
@@ -315,12 +329,12 @@
    
    public LastPageRecord getLastRecord()
    {
-      return lastRecord;
+      return lastPageRecord;
    }
 
    public void setLastRecord(LastPageRecord record)
    {
-      this.lastRecord = record;
+      this.lastPageRecord = record;
    }
 
    
@@ -410,7 +424,16 @@
    {
       validateInit();
 
-      synchronizedBlockLock.acquire();
+      positioningGlobalLock.acquire();
+      
+      // StartPaging would change positioning (by changing currentPage), because of that it needs to be in a synchronized block.
+      // Case this lock becomes a contention, we will need to implement the dual-lock antipattern (which I tried to avoid):
+      //      if (currentPage == null)
+      //      {
+      //           synchronizedBlockLock.acquire();
+      //           if (currentPage == null) // this dual-verification should be fine as currentPage is volatile
+      //              etc, etc... 
+                      
       try
       {
          if (currentPage == null)
@@ -425,7 +448,7 @@
       }
       finally
       {
-         synchronizedBlockLock.release();
+         positioningGlobalLock.release();
       }
    }
    
@@ -505,7 +528,7 @@
       
       file.close();
       
-      return new PageImpl(fileFactory, file, storeFactory, page);
+      return new PageImpl(fileFactory, file, page);
    }
    
    /**
@@ -555,11 +578,11 @@
                Page page = depage();
                if (page == null)
                {
-                  if (lastRecord != null)
+                  if (lastPageRecord != null)
                   {
-                     listener.clearLastRecord(lastRecord);
+                     listener.clearLastPageRecord(lastPageRecord);
                   }
-                  lastRecord = null;
+                  lastPageRecord = null;
                   break;
                }
                page.open();

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2008-08-27 16:36:27 UTC (rev 4883)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2008-08-28 03:50:18 UTC (rev 4884)
@@ -34,7 +34,6 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.filter.Filter;
@@ -70,17 +69,17 @@
 public class PostOfficeImpl implements PostOffice
 {  
    
-   private static final long MAX_SIZE = 100 * 1024 * 1024;
-   
    private static final Logger log = Logger.getLogger(PostOfficeImpl.class);
    
-   private static final boolean isTrace = log.isTraceEnabled();
+   //private static final boolean isTrace = log.isTraceEnabled();
+   private static final boolean isTrace = true;
    
    // This is just a debug tool method.
    // During debugs you could make log.trace as log.info, and change the variable isTrace above
    private static void trace(String message)
    {
-      log.trace(message);
+      //log.trace(message);
+      log.info(message);
    }
    
    //private final int nodeID;
@@ -93,10 +92,6 @@
    
    private final ConcurrentMap<SimpleString, FlowController> flowControllers = new ConcurrentHashMap<SimpleString, FlowController>();
    
-   private final ConcurrentMap<SimpleString, AtomicLong> queueSize = new ConcurrentHashMap<SimpleString, AtomicLong>();
-   
-   private final AtomicLong totalSize = new AtomicLong(0);
-   
    private final QueueFactory queueFactory;
    
    private final boolean checkAllowable;
@@ -246,16 +241,7 @@
    public List<MessageReference> route(final ServerMessage message) throws Exception
    {
       
-      if (pager.addSize(message) > MAX_SIZE)
-      {
-         // TODO: move this inside the Pager
-         PagingStore store = pagingManager.getPageStore(message.getDestination());
-
-         if (store.startPaging())
-         {
-            log.info("Starting paging on " + message.getDestination());
-         }
-      }
+      pager.addSize(message);
       
       SimpleString address = message.getDestination();
       
@@ -328,30 +314,8 @@
    	return flowControllers.get(address);
    }
    
-   public long getSize(SimpleString destination)
-   {
-      return getQueueSize(destination).get();
-   }
-
    // Private -----------------------------------------------------------------
    
- 
-   private AtomicLong getQueueSize(SimpleString destination)
-   {
-      AtomicLong size = this.queueSize.get(destination);
-      if (size == null)
-      {
-         size = new AtomicLong(0);
-         AtomicLong oldSize = this.queueSize.putIfAbsent(destination, size);
-         if (oldSize != null)
-         {
-            size = oldSize;
-         }
-      }
-      
-      return size;
-   }
-   
    private Binding createBinding(final SimpleString address, final SimpleString name, final Filter filter,
                                  final boolean durable, final boolean temporary)
    {
@@ -453,23 +417,17 @@
       for (SimpleString destination: dests)
       {
          PagingStore store = pagingManager.getPageStore(destination);
-         startDepageThread(store);
+         store.startDequeueThread(pager);
       }
    }
 
-   private boolean startDepageThread(PagingStore store) throws Exception
-   {
-      return store.startDequeueThread(this.pager);
-   }
-   
-   
    // TODO this probably will become a separate class?
    private class PagerImpl implements Pager
    {
       
       private final ConcurrentMap</*TransactionID*/ Long , PageTransactionInfo> transactions = new ConcurrentHashMap<Long, PageTransactionInfo>();
 
-      public void clearLastRecord(LastPageRecord lastRecord) throws Exception
+      public void clearLastPageRecord(LastPageRecord lastRecord) throws Exception
       {
          trace("Clearing lastRecord information " + lastRecord.getLastId());
          storageManager.storeDelete(lastRecord.getRecordId());
@@ -573,7 +531,7 @@
             ref.getQueue().addLast(ref);
          }
          
-         return PostOfficeImpl.this.getQueueSize(destination).get() < MAX_SIZE; 
+         return pagingStore.getQueueSize() < pagingStore.getMaxSizeBytes(); 
       }
       
       public void loadLastPage(LastPageRecord lastPage) throws Exception
@@ -589,27 +547,11 @@
       
       public void messageDone(ServerMessage message) throws Exception
       {
-         final long size = addSize(message.getDestination(), message.getEncodeSize() * -1);
-
-         if (size < MAX_SIZE)
-         {
-            PagingStore store = pagingManager.getPageStore(message.getDestination());
-            
-            if (startDepageThread(store))
-            {
-               log.info("Starting depaging Thread, size = " + size);
-            }
-         }
+         addSize(message.getDestination(), message.getEncodeSize() * -1);
       }
       
-      private long addSize(SimpleString destination, long size)
-      {
-         totalSize.addAndGet(size);
-         return getQueueSize(destination).addAndGet(size);
-      }
-      
       /** To be called when a rollback is called after messageDone was called */
-      public long addSize(ServerMessage message) throws Exception
+      public long addSize(final ServerMessage message) throws Exception
       {
          return addSize(message.getDestination(), message.getEncodeSize());      
       }
@@ -642,7 +584,6 @@
             this.transactions.remove(pageTrans);
          }
       }
-
       
       public void sync(Collection<SimpleString> destinationsToSync) throws Exception
       {
@@ -651,7 +592,46 @@
             pagingManager.getPageStore(destination).sync();
          }
       }
+      
+      
+      
+      private long addSize(final SimpleString destination, final long size) throws Exception
+      {
+         final PagingStore store = pagingManager.getPageStore(destination);
+         
+         final long addressSize = store.addQueueSize(size);
+         
+         final long maxSize = store.getMaxSizeBytes();
 
+         if (size > 0)
+         {
+            
+            if (maxSize > 0 && addressSize > maxSize)
+            {
+               if (store.startPaging())
+               {
+                  if (isTrace)
+                  {
+                     trace("Starting paging on " + destination + ", size = " + addressSize + ", maxSize=" + maxSize);
+                  }
+               }
+            }
+         }
+         else
+         {
+            if ( maxSize > 0 && addressSize < maxSize)
+            {
+               if (store.startDequeueThread(this))
+               {
+                  log.info("Starting depaging Thread, size = " + addressSize);
+               }
+            }
+         }
+         
+         return addressSize;
+      }
+      
 
+
    }
 }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-08-27 16:36:27 UTC (rev 4883)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-08-28 03:50:18 UTC (rev 4884)
@@ -34,14 +34,11 @@
 import org.jboss.messaging.core.management.MessagingServerManagement;
 import org.jboss.messaging.core.management.impl.MessagingServerManagementImpl;
 import org.jboss.messaging.core.paging.PagingManager;
-import org.jboss.messaging.core.paging.PagingStoreFactory;
-import org.jboss.messaging.core.paging.impl.PageMessageImpl;
 import org.jboss.messaging.core.paging.impl.PagingManagerFactoryNIO;
 import org.jboss.messaging.core.paging.impl.PagingManagerImpl;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.postoffice.impl.PostOfficeImpl;
-import org.jboss.messaging.core.remoting.Interceptor;
 import org.jboss.messaging.core.remoting.PacketDispatcher;
 import org.jboss.messaging.core.remoting.RemotingConnection;
 import org.jboss.messaging.core.remoting.RemotingService;
@@ -90,11 +87,11 @@
    // wired components
 
    private SecurityStore securityStore;
-   private HierarchicalRepository<QueueSettings> queueSettingsRepository = new HierarchicalObjectRepository<QueueSettings>();
+   private final HierarchicalRepository<QueueSettings> queueSettingsRepository = new HierarchicalObjectRepository<QueueSettings>();
    private ScheduledExecutorService scheduledExecutor;   
    private QueueFactory queueFactory;
    private PostOffice postOffice;
-   private ExecutorFactory executorFactory = new OrderedExecutorFactory(Executors.newCachedThreadPool(new JBMThreadFactory("JBM-async-session-delivery-threads")));
+   private final ExecutorFactory executorFactory = new OrderedExecutorFactory(Executors.newCachedThreadPool(new JBMThreadFactory("JBM-async-session-delivery-threads")));
    private HierarchicalRepository<Set<Role>> securityRepository;
    private ResourceManager resourceManager;   
    private MessagingServerPacketHandler serverPacketHandler;
@@ -172,7 +169,7 @@
       scheduledExecutor = new ScheduledThreadPoolExecutor(configuration.getScheduledThreadPoolMaxSize(), new JBMThreadFactory("JBM-scheduled-threads"));                  
       queueFactory = new QueueFactoryImpl(scheduledExecutor, queueSettingsRepository);
       
-      PagingManager pagingManager = new PagingManagerImpl(new PagingManagerFactoryNIO(configuration.getPagingDirectory(), configuration.getPageFileSize()));
+      PagingManager pagingManager = new PagingManagerImpl(new PagingManagerFactoryNIO(configuration.getPagingDirectory(), configuration.getPageFileSize()), queueSettingsRepository);
       
       postOffice = new PostOfficeImpl(storageManager, pagingManager, 
             queueFactory, configuration.isRequireDestinations());

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-08-27 16:36:27 UTC (rev 4883)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-08-28 03:50:18 UTC (rev 4884)
@@ -533,11 +533,12 @@
 
    private synchronized HandleStatus add(final MessageReference ref, final boolean first)
    {
-      if (maxSizeBytes != -1 && sizeBytes.get() + ref.getMessage().getEncodeSize() >= maxSizeBytes)
-      {
-         return HandleStatus.BUSY;              
-      }
-      
+        // TODO: Verify what this following clause means
+//      if (maxSizeBytes != -1 && sizeBytes.get() + ref.getMessage().getEncodeSize() >= maxSizeBytes)
+//      {
+//         return HandleStatus.BUSY;              
+//      }
+
       if (!first)
       {
          messagesAdded.incrementAndGet();

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java	2008-08-27 16:36:27 UTC (rev 4883)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java	2008-08-28 03:50:18 UTC (rev 4884)
@@ -35,6 +35,9 @@
 import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.impl.ServerMessageImpl;
+import org.jboss.messaging.core.settings.HierarchicalRepository;
+import org.jboss.messaging.core.settings.impl.HierarchicalObjectRepository;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.tests.util.RandomUtil;
 import org.jboss.messaging.tests.util.UnitTestCase;
 import org.jboss.messaging.util.SimpleString;
@@ -61,8 +64,12 @@
    
    public void testPagingManagerNIO() throws Exception
    {
+      HierarchicalRepository<QueueSettings> queueSettings = new HierarchicalObjectRepository<QueueSettings>();
+      queueSettings.setDefault(new QueueSettings());
+      
+      
       PagingManagerImpl managerImpl = 
-         new PagingManagerImpl(new PagingManagerFactoryNIO(journalDir, 1024*1024));
+         new PagingManagerImpl(new PagingManagerFactoryNIO(journalDir, 1024*1024), queueSettings);
       managerImpl.start();
       
       PagingStore store = managerImpl.getPageStore(new SimpleString("simple-test"));

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/fakes/FakeManagerFactory.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/fakes/FakeManagerFactory.java	2008-08-27 16:36:27 UTC (rev 4883)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/fakes/FakeManagerFactory.java	2008-08-28 03:50:18 UTC (rev 4884)
@@ -1,63 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.tests.unit.core.paging.fakes;
-
-import org.jboss.messaging.core.journal.SequentialFileFactory;
-import org.jboss.messaging.core.paging.impl.PagingManagerFactoryNIO;
-import org.jboss.messaging.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
-
-public class FakeManagerFactory extends PagingManagerFactoryNIO
-{
-
-   public FakeManagerFactory(long pageSize)
-   {
-      super("", pageSize);
-   }
-   // Constants -----------------------------------------------------
-   
-   // Attributes ----------------------------------------------------
-   
-   // Static --------------------------------------------------------
-   
-   // Constructors --------------------------------------------------
-   
-   // Public --------------------------------------------------------
-   
-   // Package protected ---------------------------------------------
-   
-   // Protected -----------------------------------------------------
-
-
-   @Override
-   protected SequentialFileFactory newFileFactory(String destinationDirectory)
-   {
-      return new FakeSequentialFileFactory();
-   }
-   
-   
-   // Private -------------------------------------------------------
-   
-   // Inner classes -------------------------------------------------
-   
-}

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java	2008-08-27 16:36:27 UTC (rev 4883)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java	2008-08-28 03:50:18 UTC (rev 4884)
@@ -34,7 +34,6 @@
 import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.impl.ServerMessageImpl;
-import org.jboss.messaging.tests.unit.core.paging.fakes.FakeManagerFactory;
 import org.jboss.messaging.tests.util.RandomUtil;
 import org.jboss.messaging.tests.util.UnitTestCase;
 import org.jboss.messaging.util.SimpleString;
@@ -67,7 +66,7 @@
       
       SequentialFile file = factory.createSequentialFile("00010.page", 1);
       
-      PageImpl impl = new PageImpl(factory, file, new FakeManagerFactory(1024), 10);
+      PageImpl impl = new PageImpl(factory, file, 10);
       
       assertEquals(10, impl.getPageId());
       
@@ -107,7 +106,7 @@
       
       file = factory.createSequentialFile("00010.page", 1);
       file.open();
-      impl = new PageImpl(factory, file, new FakeManagerFactory(1024), 10);
+      impl = new PageImpl(factory, file, 10);
       
       PageMessage msgs[] = impl.read();
       
@@ -117,7 +116,7 @@
       
       for (int i = 0; i < msgs.length; i++)
       {
-         assertEquals((long)i, msgs[i].getMessage().getMessageID());
+         assertEquals((long)0, msgs[i].getMessage().getMessageID());
          
          assertEquals(simpleDestination, msgs[i].getMessage().getDestination());
          

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java	2008-08-27 16:36:27 UTC (rev 4883)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java	2008-08-28 03:50:18 UTC (rev 4884)
@@ -32,6 +32,9 @@
 import org.jboss.messaging.core.paging.PagingStoreFactory;
 import org.jboss.messaging.core.paging.impl.PagingManagerImpl;
 import org.jboss.messaging.core.paging.impl.PagingStoreImpl;
+import org.jboss.messaging.core.settings.HierarchicalRepository;
+import org.jboss.messaging.core.settings.impl.HierarchicalObjectRepository;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.tests.util.UnitTestCase;
 import org.jboss.messaging.util.SimpleString;
 
@@ -42,6 +45,15 @@
    
    // Attributes ----------------------------------------------------
    
+   
+   private static HierarchicalRepository<QueueSettings> repoSettings = new HierarchicalObjectRepository<QueueSettings>();
+   static
+   {
+      repoSettings.setDefault(new QueueSettings());
+   }
+
+
+   
    // Static --------------------------------------------------------
    
    // Constructors --------------------------------------------------
@@ -51,8 +63,11 @@
    
    public void testGetStore() throws Exception
    {
+      HierarchicalRepository<QueueSettings> queueSettings = new HierarchicalObjectRepository<QueueSettings>();
+      queueSettings.setDefault(new QueueSettings());
+
       PagingStoreFactory spi = EasyMock.createMock(PagingStoreFactory.class);
-      PagingManagerImpl manager = new PagingManagerImpl(spi);
+      PagingManagerImpl manager = new PagingManagerImpl(spi, queueSettings);
       
       SimpleString destination = new SimpleString("some-destination");
 
@@ -69,7 +84,7 @@
       
       PagingStore store = EasyMock.createNiceMock(PagingStore.class);
       
-      EasyMock.expect(spi.newStore(destination)).andReturn(store);
+      EasyMock.expect(spi.newStore(EasyMock.eq(destination), EasyMock.isA(QueueSettings.class))).andReturn(store);
       
       store.start();
       
@@ -105,7 +120,7 @@
    public void testMultipleThreadsGetStore() throws Exception
    {
       PagingStoreFactory spi = EasyMock.createMock(PagingStoreFactory.class);
-      final PagingManagerImpl manager = new PagingManagerImpl(spi);
+      final PagingManagerImpl manager = new PagingManagerImpl(spi, repoSettings);
       
       final SimpleString destination = new SimpleString("some-destination");
 
@@ -113,9 +128,9 @@
       
       EasyMock.expect(factory.listFiles(EasyMock.isA(String.class))).andStubReturn(new ArrayList<String>());
       
-      PagingStoreImpl storeImpl = new PagingStoreImpl(factory, spi, destination, 1);
+      PagingStoreImpl storeImpl = new PagingStoreImpl(factory, destination, 1, new QueueSettings());
       
-      EasyMock.expect(spi.newStore(destination)).andStubReturn(storeImpl);
+      EasyMock.expect(spi.newStore(EasyMock.eq(destination), EasyMock.isA(QueueSettings.class))).andStubReturn(storeImpl);
       
       EasyMock.replay(spi, factory);
       

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java	2008-08-27 16:36:27 UTC (rev 4883)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java	2008-08-28 03:50:18 UTC (rev 4884)
@@ -34,8 +34,8 @@
 import org.jboss.messaging.core.paging.impl.PageMessageImpl;
 import org.jboss.messaging.core.paging.impl.PagingStoreImpl;
 import org.jboss.messaging.core.paging.impl.TestSupportPageStore;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
-import org.jboss.messaging.tests.unit.core.paging.fakes.FakeManagerFactory;
 import org.jboss.messaging.util.SimpleString;
 
 /**
@@ -62,7 +62,7 @@
    {
       SequentialFileFactory factory = new FakeSequentialFileFactory();
       
-      PagingStore storeImpl = new PagingStoreImpl(factory, new FakeManagerFactory(0), destinationTestName, 1024 * 2);
+      PagingStore storeImpl = new PagingStoreImpl(factory, destinationTestName, 1024 * 2, new QueueSettings());
       
       storeImpl.start();
       
@@ -78,7 +78,7 @@
    {
       SequentialFileFactory factory = new FakeSequentialFileFactory();
       
-      PagingStore storeImpl = new PagingStoreImpl(factory, new FakeManagerFactory(0), destinationTestName, 1024 * 2);
+      PagingStore storeImpl = new PagingStoreImpl(factory, destinationTestName, 1024 * 2, new QueueSettings());
       
       storeImpl.start();
       
@@ -90,12 +90,12 @@
       
       List<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
 
-      ByteBuffer buffer = createRandomBuffer(10);
+      ByteBuffer buffer = createRandomBuffer(0, 10);
       
       buffers.add(buffer);
       SimpleString destination = new SimpleString("test");
 
-      PageMessageImpl msg = createMessage(1l, destination, buffer);
+      PageMessageImpl msg = createMessage(destination, buffer);
       
       assertTrue(storeImpl.isPaging());
       
@@ -105,7 +105,7 @@
       
       storeImpl.sync();
       
-      storeImpl = new PagingStoreImpl(factory, new FakeManagerFactory(0), destinationTestName, 1024 * 2);
+      storeImpl = new PagingStoreImpl(factory, destinationTestName, 1024 * 2, new QueueSettings());
       
       storeImpl.start();
       
@@ -117,7 +117,7 @@
    {
       SequentialFileFactory factory = new FakeSequentialFileFactory();
       
-      PagingStore storeImpl = new PagingStoreImpl(factory, new FakeManagerFactory(0), destinationTestName, 1024 * 10);
+      PagingStore storeImpl = new PagingStoreImpl(factory, destinationTestName, 1024 * 10, new QueueSettings());
       
       storeImpl.start();
       
@@ -132,11 +132,11 @@
       for (int i = 0; i < 10; i++)
       {
 
-         ByteBuffer buffer = createRandomBuffer(10);
+         ByteBuffer buffer = createRandomBuffer(i+1l, 10);
          
          buffers.add(buffer);
    
-         PageMessageImpl msg = createMessage(i+1l, destination, buffer);
+         PageMessageImpl msg = createMessage(destination, buffer);
 
          assertTrue(storeImpl.page(msg));
       }
@@ -163,7 +163,7 @@
       
       for (int i = 0; i < 10; i++)
       {
-         assertEquals(i + 1l, msg[i].getMessage().getMessageID());
+         assertEquals(0, msg[i].getMessage().getMessageID());
          assertEqualsByteArrays(buffers.get(i).array(), msg[i].getMessage().getBody().array());
       }
       
@@ -173,7 +173,7 @@
    {
       SequentialFileFactory factory = new FakeSequentialFileFactory();
       
-      TestSupportPageStore storeImpl = new PagingStoreImpl(factory, new FakeManagerFactory(0), destinationTestName, 1024 * 10);
+      TestSupportPageStore storeImpl = new PagingStoreImpl(factory, destinationTestName, 1024 * 10, new QueueSettings());
       
       storeImpl.start();
       
@@ -190,7 +190,7 @@
       for (int i = 0; i < 10; i++)
       {
 
-         ByteBuffer buffer = createRandomBuffer(10);
+         ByteBuffer buffer = createRandomBuffer(i+1l, 10);
          
          buffers.add(buffer);
    
@@ -200,7 +200,7 @@
          }
          
          
-         PageMessageImpl msg = createMessage(i+1l, destination, buffer);
+         PageMessageImpl msg = createMessage(destination, buffer);
 
          assertTrue(storeImpl.page(msg));
       }
@@ -224,7 +224,7 @@
          
          for (int i = 0; i < 5; i++)
          {
-            assertEquals(pageNr*5 + i + 1l, msg[i].getMessage().getMessageID());
+            assertEquals(0, msg[i].getMessage().getMessageID());
             assertEqualsByteArrays(buffers.get(pageNr*5 + i).array(), msg[i].getMessage().getBody().array());
          }
       }
@@ -233,7 +233,7 @@
       
       assertTrue(storeImpl.isPaging());
 
-      PageMessageImpl msg = createMessage(100, destination, buffers.get(0));
+      PageMessageImpl msg = createMessage(destination, buffers.get(0));
       
       assertTrue(storeImpl.page(msg));
       
@@ -267,7 +267,7 @@
       
       assertEquals(1, msgs.length);
       
-      assertEquals(100l, msgs[0].getMessage().getMessageID());
+      assertEquals(0l, msgs[0].getMessage().getMessageID());
       
       assertEqualsByteArrays(buffers.get(0).array(), msgs[0].getMessage().getBody().array());
       

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java	2008-08-27 16:36:27 UTC (rev 4883)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java	2008-08-28 03:50:18 UTC (rev 4884)
@@ -41,7 +41,7 @@
 import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.impl.ServerMessageImpl;
-import org.jboss.messaging.tests.unit.core.paging.fakes.FakeManagerFactory;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.tests.util.RandomUtil;
 import org.jboss.messaging.tests.util.UnitTestCase;
 import org.jboss.messaging.util.SimpleString;
@@ -83,7 +83,7 @@
       
       final ArrayList<Page> readPages = new ArrayList<Page>();
       
-      final TestSupportPageStore storeImpl = new PagingStoreImpl(factory, new FakeManagerFactory(0), new SimpleString("test"), MAX_SIZE);
+      final TestSupportPageStore storeImpl = new PagingStoreImpl(factory, new SimpleString("test"), MAX_SIZE, new QueueSettings());
       
       storeImpl.start();
       
@@ -112,7 +112,7 @@
                while (true)
                {
                   long id = messageIdGenerator.incrementAndGet();
-                  PageMessageImpl msg = createMessage(id, destination, createRandomBuffer(5));
+                  PageMessageImpl msg = createMessage(destination, createRandomBuffer(id, 5));
                   if (storeImpl.page(msg))
                   {
                      buffers.put(id, msg);
@@ -207,8 +207,12 @@
          
          for (PageMessage msg : msgs)
          {
-            PageMessageImpl msgWritten = buffers.remove(msg.getMessage().getMessageID());
-            buffers2.put(msg.getMessage().getMessageID(), msg);
+            msg.getMessage().getBody().rewind();
+            long id = msg.getMessage().getBody().getLong();
+            msg.getMessage().getBody().rewind();
+            
+            PageMessageImpl msgWritten = buffers.remove(id);
+            buffers2.put(id, msg);
             assertNotNull(msgWritten);
             assertEquals (msg.getMessage().getDestination(), msgWritten.getMessage().getDestination());
             assertEqualsByteArrays(msgWritten.getMessage().getBody().array(), msg.getMessage().getBody().array());
@@ -229,7 +233,7 @@
          fileTmp.close();         
       }
       
-      TestSupportPageStore storeImpl2 = new PagingStoreImpl(factory, new FakeManagerFactory(0), new SimpleString("test"), MAX_SIZE);
+      TestSupportPageStore storeImpl2 = new PagingStoreImpl(factory, new SimpleString("test"), MAX_SIZE, new QueueSettings());
       storeImpl2.start();
       
       int numberOfPages = storeImpl2.getNumberOfPages();
@@ -245,7 +249,7 @@
       assertEquals(numberOfPages, storeImpl2.getNumberOfPages());
       
       long lastMessageId = messageIdGenerator.incrementAndGet();
-      PageMessage lastMsg = createMessage(lastMessageId, destination, createRandomBuffer(5));
+      PageMessage lastMsg = createMessage(destination, createRandomBuffer(lastMessageId, 5));
       
       storeImpl2.page(lastMsg);
       buffers2.put(lastMessageId, lastMsg);
@@ -270,7 +274,9 @@
          for (PageMessage msg: msgs)
          {
             
-            PageMessage msgWritten = buffers2.remove(msg.getMessage().getMessageID());
+            msg.getMessage().getBody().rewind();
+            long id = msg.getMessage().getBody().getLong();
+            PageMessage msgWritten = buffers2.remove(id);
             assertNotNull(msgWritten);
             assertEquals (msg.getMessage().getDestination(), msgWritten.getMessage().getDestination());
             assertEqualsByteArrays(msgWritten.getMessage().getBody().array(), msg.getMessage().getBody().array());
@@ -283,7 +289,8 @@
       lastPage.close();
       assertEquals(1, lastMessages.length);
       
-      assertEquals(lastMessages[0].getMessage().getMessageID(), lastMessageId);
+      lastMessages[0].getMessage().getBody().rewind();
+      assertEquals(lastMessages[0].getMessage().getBody().getLong(), lastMessageId);
       assertEqualsByteArrays(lastMessages[0].getMessage().getBody().array(), lastMsg.getMessage().getBody().array());
       
       assertEquals(0, buffers2.size());
@@ -291,22 +298,22 @@
       
    }
 
-   protected PageMessageImpl createMessage(long messageId, SimpleString destination, ByteBuffer buffer)
+   protected PageMessageImpl createMessage(SimpleString destination, ByteBuffer buffer)
    {
       ServerMessage msg = new ServerMessageImpl((byte)1, true, 0,
             System.currentTimeMillis(), (byte)0, new ByteBufferWrapper(buffer));
       
-      msg.setMessageID((long)messageId);
-      
       msg.setDestination(destination);
       return new PageMessageImpl(msg);
    }
 
-   protected ByteBuffer createRandomBuffer(int size)
+   protected ByteBuffer createRandomBuffer(long id, int size)
    {
-      ByteBuffer buffer = ByteBuffer.allocate(size);
+      ByteBuffer buffer = ByteBuffer.allocate(size + 8);
       
-      for (int j = 0; j < buffer.limit(); j++)
+      buffer.putLong(id);
+      
+      for (int j = 8; j < buffer.limit(); j++)
       {
          buffer.put(RandomUtil.randomByte());
       }




More information about the jboss-cvs-commits mailing list