[jboss-cvs] JBoss Messaging SVN: r4861 - in branches/Branch_JBMESSAGING-1314: src/main/org/jboss/messaging/core/paging/impl and 10 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Aug 22 00:06:45 EDT 2008


Author: clebert.suconic at jboss.com
Date: 2008-08-22 00:06:45 -0400 (Fri, 22 Aug 2008)
New Revision: 4861

Added:
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Pager.java
Removed:
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/DepageListener.java
Modified:
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Page.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PageTransaction.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingManager.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageTransactionImpl.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/TestSupportPageStore.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/StorageManager.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageTransactionImplTest.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
Log:
First attempt on transactions and paging

Deleted: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/DepageListener.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/DepageListener.java	2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/DepageListener.java	2008-08-22 04:06:45 UTC (rev 4861)
@@ -1,55 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.core.paging;
-
-import org.jboss.messaging.core.journal.EncodingSupport;
-import org.jboss.messaging.util.SimpleString;
-
-public interface DepageListener<T extends EncodingSupport>
-{
-   
-   // Constants -----------------------------------------------------
-   
-   // Attributes ----------------------------------------------------
-   
-   // Static --------------------------------------------------------
-   
-   // Constructors --------------------------------------------------
-   
-   // Public --------------------------------------------------------
-
-   /**
-    * @return false if the listener can't handle more pages
-    */
-   boolean onDepage(SimpleString destination, T[] data) throws Exception;
-   
-   // Package protected ---------------------------------------------
-   
-   // Protected -----------------------------------------------------
-   
-   // Private -------------------------------------------------------
-   
-   // Inner classes -------------------------------------------------
-   
-}

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Page.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Page.java	2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Page.java	2008-08-22 04:06:45 UTC (rev 4861)
@@ -23,7 +23,7 @@
 
 package org.jboss.messaging.core.paging;
 
-import org.jboss.messaging.core.journal.EncodingSupport;
+import org.jboss.messaging.core.paging.impl.PageMessage;
 
 /**
  * 
@@ -34,14 +34,14 @@
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
  */
-public interface Page<T extends EncodingSupport>
+public interface Page
 {
    
    int getPageId();
    
-   void write(T message) throws Exception;
+   void write(PageMessage message) throws Exception;
    
-   T[] read() throws Exception;
+   PageMessage[] read() throws Exception;
    
    int getSize();
    

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PageTransaction.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PageTransaction.java	2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PageTransaction.java	2008-08-22 04:06:45 UTC (rev 4861)
@@ -34,18 +34,24 @@
 public interface PageTransaction extends EncodingSupport
 {
 
+   boolean isCommitted();
+   
+   void complete();
+
    long getRecordID();
 
+   void setRecordID(long id);
+
    long getTransactionID();
    
-   void setTransactionID(long transactionID);
+   int addMessage(SimpleString destination);
    
-   void addMessage(SimpleString destination);
+   SimpleString[] getDestinations();
    
-   void decrementMessage(SimpleString destination, int numberOfMessages);
+   int decrement();
    
-   int getSize(SimpleString destination);
+   int decrement(int elements);
    
-   boolean isEmpty();
+   int getSize();
 
 }

Added: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Pager.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Pager.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Pager.java	2008-08-22 04:06:45 UTC (rev 4861)
@@ -0,0 +1,102 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.paging;
+
+import org.jboss.messaging.core.paging.impl.PageMessage;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * 
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ * @param <T> An Encoding Support.
+ * TODO: After we have the Paging system stable, maybe we can remove the generic part
+ */
+public interface Pager
+{
+   
+   /**
+    * @return false if the listener can't handle more pages
+    */
+   boolean onDepage(SimpleString destination, PageMessage[] data) throws Exception;
+   
+   /** 
+    * Depage could depage messages for transactions not committed yet,
+    * So we need to eventually send pending messages
+    * */
+   void beginTransaction(long transactionID, PageTransaction pageTransaction);
+   
+   /** 
+    * Depage could depage messages for transactions not committed yet,
+    * So we need to eventually send pending messages
+    * */
+   void commitTransaction(long transactionID, PageTransaction pageTransaction);
+   
+   /**
+    * Need to clear any information about the transaction, any eventual data will be ignored 
+    * */
+   void rollbackTransaction(long transactionID);
+   
+   
+   
+   /**
+    * To be used by transactions only.
+    * If you're sure you will page if isPaging, just call the method page and look at its return. 
+    * @param destination
+    * @return
+    */
+   boolean isPaging(SimpleString destination) throws Exception;
+   
+   /**
+    * Page, only if destination is in page mode.
+    * @param message
+    * @return false if destination is not on page mode
+    */
+   boolean page(ServerMessage message) throws Exception;
+   
+   /**
+    * Page, only if destination is in page mode.
+    * 
+    * page is an atomic operation. It's better to call page and get test the return.
+    * 
+    * @param message
+    * @return false if destination is not on page mode
+    */
+   boolean page(ServerMessage message, long transactionID) throws Exception;
+   
+   /** 
+    * 
+    * To be called when there are no more references to the message
+    * @param message
+    */
+   void messageDone(ServerMessage message) throws Exception;
+   
+   /** To be called when a rollback is called after messageDone was called */
+   long addSize(ServerMessage message) throws Exception;
+
+   void sync(PageTransaction pageTransaction) throws Exception;
+   
+   
+}

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingManager.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingManager.java	2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingManager.java	2008-08-22 04:06:45 UTC (rev 4861)
@@ -22,7 +22,6 @@
 
 package org.jboss.messaging.core.paging;
 
-import org.jboss.messaging.core.journal.EncodingSupport;
 import org.jboss.messaging.core.server.MessagingComponent;
 import org.jboss.messaging.util.SimpleString;
 
@@ -33,7 +32,7 @@
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  *
  */
-public interface PagingManager<T extends EncodingSupport> extends MessagingComponent
+public interface PagingManager extends MessagingComponent
 {
-   public PagingStore<T> getPageStore(SimpleString storeName) throws Exception;
+   public PagingStore getPageStore(SimpleString storeName) throws Exception;
 }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java	2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java	2008-08-22 04:06:45 UTC (rev 4861)
@@ -23,10 +23,9 @@
 
 package org.jboss.messaging.core.paging;
 
-import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.paging.impl.PageMessage;
 import org.jboss.messaging.core.server.MessagingComponent;
 import org.jboss.messaging.util.SimpleString;
-import org.jboss.messaging.core.journal.EncodingSupport;
 
 /**
  * 
@@ -37,7 +36,7 @@
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
  */
-public interface PagingStore<T extends EncodingSupport> extends MessagingComponent
+public interface PagingStore extends MessagingComponent
 {
    
    int getNumberOfPages();
@@ -51,7 +50,7 @@
    
    void sync() throws Exception;
    
-   boolean page(T message) throws Exception;
+   boolean page(PageMessage message) throws Exception;
    
    /** 
     * Remove the first page from the Writing Queue.
@@ -60,7 +59,7 @@
     * @return
     * @throws Exception 
     */
-   Page<T> depage() throws Exception;
+   Page depage() throws Exception;
    
    /**
     * 
@@ -68,6 +67,6 @@
     * @return false if a thread was already started, or if not in page mode
     * @throws Exception 
     */
-   boolean startDequeueThread(DepageListener<T> listener) throws Exception;
+   boolean startDequeueThread(Pager listener) throws Exception;
 
 }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java	2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java	2008-08-22 04:06:45 UTC (rev 4861)
@@ -23,7 +23,7 @@
 
 package org.jboss.messaging.core.paging;
 
-import org.jboss.messaging.core.journal.EncodingSupport;
+import org.jboss.messaging.core.paging.impl.PageMessage;
 
 
 /**
@@ -32,11 +32,11 @@
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
  */
-public interface PagingStoreFactory<T extends EncodingSupport>
+public interface PagingStoreFactory
 {
 
-   PagingStore<T> newStore(org.jboss.messaging.util.SimpleString destinationName);
-   T newElement();
-   T[] newArray(int size);
+   PagingStore newStore(org.jboss.messaging.util.SimpleString destinationName);
+   PageMessage newElement();
+   PageMessage[] newArray(int size);
    
 }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java	2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java	2008-08-22 04:06:45 UTC (rev 4861)
@@ -27,7 +27,6 @@
 import java.util.ArrayList;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.jboss.messaging.core.journal.EncodingSupport;
 import org.jboss.messaging.core.journal.IOCallback;
 import org.jboss.messaging.core.journal.SequentialFile;
 import org.jboss.messaging.core.journal.SequentialFileFactory;
@@ -41,7 +40,7 @@
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
  */
-public class PageImpl<T extends EncodingSupport> implements Page<T>
+public class PageImpl implements Page
 {
    
    // Constants -----------------------------------------------------
@@ -60,7 +59,7 @@
    // Attributes ----------------------------------------------------
    
    private final int pageId;
-   private final PagingStoreFactory<T> storeFactory;
+   private final PagingStoreFactory storeFactory;
    private final AtomicInteger numberOfMessages = new AtomicInteger(0);
    private final SequentialFile file;
    private final SequentialFileFactory fileFactory;
@@ -71,7 +70,7 @@
    
    // Constructors --------------------------------------------------
    
-   public PageImpl(final SequentialFileFactory factory, final SequentialFile file, PagingStoreFactory<T> storeFactory, final int pageId) throws Exception
+   public PageImpl(final SequentialFileFactory factory, final SequentialFile file, PagingStoreFactory storeFactory, final int pageId) throws Exception
    {
       this.pageId = pageId;
       this.file = file;
@@ -99,10 +98,10 @@
       return pageId;
    }
    
-   public T[] read() throws Exception
+   public PageMessage[] read() throws Exception
    {
       
-      ArrayList<T> messages = new ArrayList<T>();
+      ArrayList<PageMessage> messages = new ArrayList<PageMessage>();
 
       ByteBuffer buffer = fileFactory.newBuffer((int)file.size());
       file.position(0);
@@ -124,7 +123,7 @@
                int oldPos = buffer.position();
                if (buffer.position() + messageSize < buffer.limit() && buffer.get(oldPos + messageSize) == END_BYTE)
                {
-                  T msg = instantiateObject();
+                  PageMessage msg = instantiateObject();
                   msg.decode(messageBuffer);
                   messages.add(msg);
                }
@@ -145,7 +144,7 @@
       return messages.toArray(instantiateArray(messages.size()));
    }
    
-   public void write(final T message) throws Exception
+   public void write(final PageMessage message) throws Exception
    {
       ByteBuffer buffer = fileFactory.newBuffer(message.getEncodeSize() + SIZE_RECORD);
       buffer.put(START_BYTE);
@@ -212,13 +211,13 @@
    
    // Protected -----------------------------------------------------
    
-   protected  T instantiateObject()
+   protected  PageMessage instantiateObject()
    {
       return storeFactory.newElement();
    }
 
    
-   protected T[] instantiateArray(final int size)
+   protected PageMessage[] instantiateArray(final int size)
    {
       return storeFactory.newArray(size);
    }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageTransactionImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageTransactionImpl.java	2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageTransactionImpl.java	2008-08-22 04:06:45 UTC (rev 4861)
@@ -23,12 +23,11 @@
 
 package org.jboss.messaging.core.paging.impl;
 
-import java.util.HashMap;
-import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.jboss.messaging.core.paging.PageTransaction;
 import org.jboss.messaging.core.remoting.MessagingBuffer;
+import org.jboss.messaging.util.ConcurrentHashSet;
 import org.jboss.messaging.util.SimpleString;
 
 /**
@@ -43,28 +42,29 @@
    
    // Attributes ----------------------------------------------------
    
-   final long recordID;
-   
    long transactionID;
+   long recordID;
+   boolean committed = false;
    
-   final Map<SimpleString, AtomicInteger> destinations = new HashMap<SimpleString, AtomicInteger>();
+   final AtomicInteger numberOfMessages = new AtomicInteger(0);
+
+   /** This is a transient field, not being persisted */
+   final ConcurrentHashSet<SimpleString> destinations = new ConcurrentHashSet<SimpleString>();
    
    
    // Static --------------------------------------------------------
    
    // Constructors --------------------------------------------------
    
-   public PageTransactionImpl(final long recordID, final long transactionID)
+   public PageTransactionImpl(final long transactionID)
    {
-      this.recordID = recordID;
       this.transactionID = transactionID;
    }
 
-   public PageTransactionImpl(final long recordID)
+   public PageTransactionImpl()
    {
-      this.recordID = recordID;
    }
-   
+
    // Public --------------------------------------------------------
 
    
@@ -73,109 +73,87 @@
       return recordID;
    }
    
+   public void setRecordID(long recordID)
+   {
+      this.recordID = recordID;
+   }
+   
    public long getTransactionID()
    {
       return transactionID;
    }
    
-   public void setTransactionID(final long transactionID)
+   public int addMessage(SimpleString destination)
    {
-      this.transactionID = transactionID;
+      this.destinations.add(destination);
+      
+      return numberOfMessages.incrementAndGet();
    }
    
-   public synchronized void addMessage(final SimpleString destination)
+   public int decrement()
    {
-      AtomicInteger value = destinations.get(destination);
-      if (value == null)
+      final int value = numberOfMessages.decrementAndGet();
+      if (value < 0)
       {
-         destinations.put(destination, new AtomicInteger(1));
+         throw new IllegalStateException("Internal error Negative value on Paging transactions!");
       }
-      else
-      {
-         value.incrementAndGet();
-      }
+      
+      return value;
    }
    
-   public synchronized void decrementMessage(final SimpleString destination, final int numberOfMessages)
+   public int decrement(int elements)
    {
-      AtomicInteger value = destinations.get(destination);
-      if (value == null)
+      final int value = numberOfMessages.addAndGet(elements * -1);
+      if (value < 0)
       {
-         throw new IllegalStateException("Can't find counter for destination " + destination);
+         throw new IllegalStateException("Internal error Negative value on Paging transactions!");
       }
-      if (value.addAndGet(numberOfMessages * -1) < 0)
-      {
-         throw new IllegalStateException("Negative value on destination " + destination + " at PageTransaction");
-      }
+      
+      return value;
    }
    
-   public synchronized int getSize(final SimpleString destination)
+   public int getSize()
    {
-      AtomicInteger value = destinations.get(destination);
-      if (value == null)
-      {
-         return 0;
-      }
-      else
-      {
-         return value.intValue();
-      }
+      return numberOfMessages.get();
    }
    
-   public synchronized boolean isEmpty()
+   public SimpleString[] getDestinations()
    {
-      for(Map.Entry<SimpleString, AtomicInteger> element: destinations.entrySet())
-      {
-         if (element.getValue().intValue() != 0)
-         {
-            return false;
-         }
-      }
-      
-      return true;
+      return destinations.toArray(new SimpleString[destinations.size()]);
    }
    
-   
    // EncodingSupport implementation 
    
    public synchronized void decode(final MessagingBuffer buffer)
    {
       this.transactionID = buffer.getLong();
-      final int numberOfElements = buffer.getInt();
-      destinations.clear();
-      
-      for (int i = 0; i < numberOfElements; i++)
-      {
-         SimpleString str = buffer.getSimpleString();
-         AtomicInteger numberOfMessages = new AtomicInteger(buffer.getInt());
-         destinations.put(str, numberOfMessages);
-      }
+      this.recordID = buffer.getLong();
+      this.numberOfMessages.set(buffer.getInt());
    }
    
    public synchronized void encode(final MessagingBuffer buffer)
    {
-      
+      this.committed = true; // if it is being readed, certainly it was committed
       buffer.putLong(this.transactionID);
-      buffer.putInt(destinations.size());
-      
-      for(Map.Entry<SimpleString, AtomicInteger> element: destinations.entrySet())
-      {
-         buffer.putSimpleString(element.getKey());
-         buffer.putInt(element.getValue().intValue());
-      }
+      buffer.putLong(this.recordID);
+      buffer.putInt(this.numberOfMessages.get());
    }
 
    public synchronized int getEncodeSize()
    {
-      int size = 0;
-      for(Map.Entry<SimpleString, AtomicInteger> element: destinations.entrySet())
-      {
-         size += SimpleString.sizeofString(element.getKey());
-      }
-
-      return size + destinations.size() * 4 + 4 + 8;
+      return 8*2 /* long */ + 4 /* int */;
    }
    
+   public boolean isCommitted()
+   {
+      return committed;
+   }
+   
+   public void complete()
+   {
+      committed = true;
+   }
+   
    // Package protected ---------------------------------------------
    
    // Protected -----------------------------------------------------

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java	2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java	2008-08-22 04:06:45 UTC (rev 4861)
@@ -37,7 +37,7 @@
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
  */
-public class PagingManagerFactoryNIO implements PagingStoreFactory<PageMessage>
+public class PagingManagerFactoryNIO implements PagingStoreFactory
 {
    
    // Constants -----------------------------------------------------
@@ -59,13 +59,13 @@
    
    // Public --------------------------------------------------------
 
-   public PagingStore<PageMessage> newStore(SimpleString destinationName)
+   public PagingStore newStore(SimpleString destinationName)
    {
       final String destinationDirectory = directory + "/" + destinationName.toString();
       File destinationFile = new File(destinationDirectory);
       destinationFile.mkdirs();
       
-      return new PagingStoreImpl<PageMessage>(newFileFactory(destinationDirectory), this, destinationName, pageSize);
+      return new PagingStoreImpl(newFileFactory(destinationDirectory), this, destinationName, pageSize);
    }
 
    public PageMessage[] newArray(int size)

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2008-08-22 04:06:45 UTC (rev 4861)
@@ -26,7 +26,6 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
-import org.jboss.messaging.core.journal.EncodingSupport;
 import org.jboss.messaging.core.paging.PagingManager;
 import org.jboss.messaging.core.paging.PagingStore;
 import org.jboss.messaging.core.paging.PagingStoreFactory;
@@ -37,7 +36,7 @@
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
  */
-public class PagingManagerImpl<T extends EncodingSupport> implements PagingManager<T>
+public class PagingManagerImpl implements PagingManager
 {
    // Constants -----------------------------------------------------
    
@@ -45,32 +44,32 @@
    
    private volatile boolean started = false;
    
-   private final ConcurrentMap<SimpleString, PagingStore<T>> stores = new ConcurrentHashMap<SimpleString, PagingStore<T>>();
+   private final ConcurrentMap<SimpleString, PagingStore> stores = new ConcurrentHashMap<SimpleString, PagingStore>();
    
-   private final PagingStoreFactory<T> pagingSPI;
+   private final PagingStoreFactory pagingSPI;
    
    // Static --------------------------------------------------------
    
    // Constructors --------------------------------------------------
    
-   public PagingManagerImpl(final PagingStoreFactory<T> pagingSPI)
+   public PagingManagerImpl(final PagingStoreFactory pagingSPI)
    {
       this.pagingSPI = pagingSPI;
    }
    
    // Public --------------------------------------------------------
    
-   public PagingStore<T> getPageStore(final SimpleString storeName) throws Exception
+   public PagingStore getPageStore(final SimpleString storeName) throws Exception
    {
       validateStarted();
       
-      PagingStore<T> store = stores.get(storeName);
+      PagingStore store = stores.get(storeName);
       if (store == null)
       {
          
          store = newStore(storeName);
          
-         PagingStore<T> oldStore = stores.putIfAbsent(storeName, store);
+         PagingStore oldStore = stores.putIfAbsent(storeName, store);
          
          if (oldStore != null)
          {
@@ -99,7 +98,7 @@
    {
       this.started = false;
       
-      for (PagingStore<T> store: stores.values())
+      for (PagingStore store: stores.values())
       {
          store.stop();
       }
@@ -112,7 +111,7 @@
    
    // Private -------------------------------------------------------
    
-   private PagingStore<T> newStore(final SimpleString destinationName)
+   private PagingStore newStore(final SimpleString destinationName)
    {
       return pagingSPI.newStore(destinationName);
    }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2008-08-22 04:06:45 UTC (rev 4861)
@@ -34,7 +34,7 @@
 import org.jboss.messaging.core.journal.SequentialFile;
 import org.jboss.messaging.core.journal.SequentialFileFactory;
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.paging.DepageListener;
+import org.jboss.messaging.core.paging.Pager;
 import org.jboss.messaging.core.paging.Page;
 import org.jboss.messaging.core.paging.PagingStoreFactory;
 import org.jboss.messaging.util.SimpleString;
@@ -44,7 +44,7 @@
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
  */
-public class PagingStoreImpl<T extends EncodingSupport> implements TestSupportPageStore<T>
+public class PagingStoreImpl implements TestSupportPageStore
 {
 
    // Constants -----------------------------------------------------
@@ -58,7 +58,7 @@
    
    private final SimpleString storeName;
    
-   private final PagingStoreFactory<T> storeFactory;
+   private final PagingStoreFactory storeFactory;
    
    private final SequentialFileFactory fileFactory;
    
@@ -70,7 +70,7 @@
    private volatile int numberOfPages;
    private volatile int firstPageId = Integer.MAX_VALUE;
    private volatile int currentPageId;
-   private volatile Page<T> currentPage;
+   private volatile Page currentPage;
 
    // This is supposed to perform better than synchronized methods
    private final Semaphore globalLock = new Semaphore(1);
@@ -84,7 +84,7 @@
    // Constructors --------------------------------------------------
    
    
-   public PagingStoreImpl(final SequentialFileFactory fileFactory, PagingStoreFactory<T> storeFactory, final SimpleString storeName, final long maxPageSize) 
+   public PagingStoreImpl(final SequentialFileFactory fileFactory, PagingStoreFactory storeFactory, final SimpleString storeName, final long maxPageSize) 
    {
       this.storeFactory = storeFactory;
       this.fileFactory = fileFactory;
@@ -121,10 +121,12 @@
       return storeName;
    }
    
-   public Page<T> depage() throws Exception
+   /** It returns one of the Pages. It doesn't perform reading by itself. */
+   public Page depage() throws Exception
    {
       validateInit();
       
+      // Read needs both global and writeLock
       globalLock.acquire();
       lock.writeLock().lock();
       
@@ -140,7 +142,7 @@
 
             numberOfPages--;
             
-            final Page<T> returnPage;
+            final Page returnPage;
             if (currentPageId == firstPageId)
             {
                firstPageId = Integer.MAX_VALUE;
@@ -188,7 +190,7 @@
       
    }
 
-   public boolean page(T message) throws Exception
+   public boolean page(PageMessage message) throws Exception
    {
       validateInit();
       
@@ -266,7 +268,7 @@
       }
    }
    
-   public boolean startDequeueThread(final DepageListener<T> listener) throws Exception
+   public boolean startDequeueThread(final Pager listener) throws Exception
    {
       if (!isPaging())
       {
@@ -453,7 +455,7 @@
    }
 
 
-   private Page<T> createPage(int page) throws Exception
+   private Page createPage(int page) throws Exception
    {
       String fileName = createFileName(page);
       SequentialFile file = fileFactory.createSequentialFile(fileName, 1000);
@@ -471,7 +473,7 @@
       
       file.close();
       
-      return new PageImpl<T>(fileFactory, file, storeFactory, page);
+      return new PageImpl(fileFactory, file, storeFactory, page);
    }
    
    /**
@@ -503,9 +505,9 @@
    
    class DequeueThread extends Thread
    {
-      final DepageListener<T> listener;
+      final Pager listener;
       
-      public DequeueThread(final DepageListener<T> listener)
+      public DequeueThread(final Pager listener)
       {
          this.listener = listener;
       }
@@ -518,14 +520,14 @@
             boolean needMorePages = false;
             do
             {
-               Page<T> page = depage();
+               Page page = depage();
                if (page == null)
                {
                   break;
                }
                page.open();
-               T messages[] = page.read();
-               listener.onDepage(PagingStoreImpl.this.storeName, messages);
+               PageMessage messages[] = page.read();
+               needMorePages = listener.onDepage(PagingStoreImpl.this.storeName, messages);
                page.delete();
             }
             while (needMorePages);

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/TestSupportPageStore.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/TestSupportPageStore.java	2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/TestSupportPageStore.java	2008-08-22 04:06:45 UTC (rev 4861)
@@ -25,14 +25,12 @@
 
 import org.jboss.messaging.core.paging.PagingStore;
 
-import org.jboss.messaging.core.journal.EncodingSupport;
-
 /**
  * All the methods required to TestCases on  PageStoreImpl
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
  */
-public interface TestSupportPageStore<T extends EncodingSupport> extends PagingStore<T>
+public interface TestSupportPageStore extends PagingStore
 {
    void forceAnotherPage() throws Exception;
 }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/StorageManager.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/StorageManager.java	2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/StorageManager.java	2008-08-22 04:06:45 UTC (rev 4861)
@@ -25,6 +25,7 @@
 import java.util.List;
 import java.util.Map;
 
+import org.jboss.messaging.core.paging.PageTransaction;
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.server.MessageReference;
@@ -72,6 +73,12 @@
    void rollback(long txID) throws Exception;
       
    
+   void storePageTransaction(long txID, PageTransaction pageTransaction) throws Exception;
+
+   void updatePageTransaction(long txID, PageTransaction pageTransaction) throws Exception;
+
+   
+   
    void updateDeliveryCount(MessageReference ref) throws Exception;     
    
    void loadMessages(PostOffice postOffice, Map<Long, Queue> queues) throws Exception;

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2008-08-22 04:06:45 UTC (rev 4861)
@@ -47,6 +47,7 @@
 import org.jboss.messaging.core.journal.impl.JournalImpl;
 import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.paging.PageTransaction;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.PostOffice;
@@ -96,6 +97,8 @@
    
    public static final byte UPDATE_DELIVERY_COUNT = 33;
    
+   public static final byte PAGE_TRANSACTION = 34;
+   
    public static final byte SET_SCHEDULED_DELIVERY_TIME = 44;
   	
 	private final AtomicLong messageIDSequence = new AtomicLong(0);
@@ -216,7 +219,17 @@
    {
       messageJournal.appendAddRecordTransactional(txID, message.getMessageID(), ADD_MESSAGE, message);
    }
-   
+
+   public void storePageTransaction(long txID, PageTransaction pageTransaction) throws Exception
+   {
+      messageJournal.appendAddRecordTransactional(txID, pageTransaction.getRecordID(), PAGE_TRANSACTION, pageTransaction);
+   }
+
+   public void updatePageTransaction(long txID, PageTransaction pageTransaction) throws Exception
+   {
+      messageJournal.appendUpdateRecordTransactional(txID, pageTransaction.getRecordID(), PAGE_TRANSACTION, pageTransaction);
+   }
+
    public void storeAcknowledgeTransactional(long txID, long queueID, long messageID) throws Exception
    {
    	EncodingSupport record = ackBytes(queueID, messageID);
@@ -245,6 +258,7 @@
    }
    
    // Other operations
+
    
 	public void updateDeliveryCount(final MessageReference ref) throws Exception
 	{

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java	2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java	2008-08-22 04:06:45 UTC (rev 4861)
@@ -26,6 +26,7 @@
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.jboss.messaging.core.paging.PageTransaction;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.PostOffice;
@@ -117,7 +118,15 @@
 	{
 	}
 
-	public void updateDeliveryCount(MessageReference ref) throws Exception
+	public void storePageTransaction(long txID, PageTransaction pageTransaction) throws Exception
+   {
+   }
+
+   public void updatePageTransaction(long txID, PageTransaction pageTransaction) throws Exception
+   {
+   }
+
+   public void updateDeliveryCount(MessageReference ref) throws Exception
 	{
 	}
 
@@ -155,5 +164,5 @@
 	{
 	   return started;
 	}
-  
+
 }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/PostOffice.java	2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/PostOffice.java	2008-08-22 04:06:45 UTC (rev 4861)
@@ -27,6 +27,8 @@
 import java.util.Set;
 
 import org.jboss.messaging.core.filter.Filter;
+import org.jboss.messaging.core.paging.Pager;
+import org.jboss.messaging.core.paging.impl.PageMessage;
 import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.MessagingComponent;
 import org.jboss.messaging.core.server.ServerMessage;
@@ -80,33 +82,8 @@
    Map<SimpleString, List<Binding>> getMappings();
 
    Set<SimpleString> listAllDestinations();
-
-   void routeAndDeliver(final ServerMessage msg) throws Exception;
    
+   Pager getPager();
    
-   /**
-    * To be used by transactions only.
-    * If you're sure you will page if isPaging, just call the method page and look at its return. 
-    * @param destination
-    * @return
-    */
-   boolean isPaging(SimpleString destination) throws Exception;
    
-   /**
-    * Page, only if destination is in page mode.
-    * @param message
-    * @return false if destination is not on page mode
-    */
-   boolean page(ServerMessage message) throws Exception;
-   
-   /** 
-    * 
-    * To be called when there are no more references to the message
-    * @param message
-    */
-   void messageDone(ServerMessage message) throws Exception;
-   
-   /** To be called when a rollback is called after messageDone was called */
-   long addSize(ServerMessage message) throws Exception;
-   
 }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2008-08-22 04:06:45 UTC (rev 4861)
@@ -25,6 +25,7 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -37,7 +38,8 @@
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.paging.DepageListener;
+import org.jboss.messaging.core.paging.Pager;
+import org.jboss.messaging.core.paging.PageTransaction;
 import org.jboss.messaging.core.paging.PagingManager;
 import org.jboss.messaging.core.paging.PagingStore;
 import org.jboss.messaging.core.paging.impl.PageMessage;
@@ -63,7 +65,7 @@
 public class PostOfficeImpl implements PostOffice
 {  
    
-   private static final long MAX_SIZE = 100 * 1024 * 1024;
+   private static final long MAX_SIZE = 1000000;
    
    private static final Logger log = Logger.getLogger(PostOfficeImpl.class);
    
@@ -87,12 +89,14 @@
    
    private final StorageManager storageManager;
    
-   private final PagingManager<PageMessage> pagingManager;
+   private final PagingManager pagingManager;
    
+   private final Pager pager = new PagerImpl();
+   
    private volatile boolean started;
     
    public PostOfficeImpl(final StorageManager storageManager,
-                         final PagingManager<PageMessage> pagingManager,
+                         final PagingManager pagingManager,
    		                final QueueFactory queueFactory, final boolean checkAllowable)
    {
       this.storageManager = storageManager;
@@ -225,25 +229,12 @@
       return nameMap.get(queueName);
    }
    
-   public void routeAndDeliver(final ServerMessage message) throws Exception
+   public List<MessageReference> route(final ServerMessage message) throws Exception
    {
-      List<MessageReference> refs = this.route(message);
       
-      if (message.getDurableRefCount() != 0)
+      if (pager.addSize(message) > MAX_SIZE)
       {
-         storageManager.storeMessage(message);
-      }
-      
-      for (MessageReference ref : refs)
-      {
-         ref.getQueue().addLast(ref);
-      }
-   }
-         
-   public List<MessageReference> route(final ServerMessage message) throws Exception
-   {
-      if (addSize(message.getDestination(), message.getEncodeSize()) > MAX_SIZE)
-      {
+         // TODO: move this inside the Pager
          PagingStore store = pagingManager.getPageStore(message.getDestination());
 
          if (store.startPaging())
@@ -307,35 +298,11 @@
 //      }
 //   }
    
-
-   public boolean isPaging(SimpleString destination) throws Exception
+   public Pager getPager()
    {
-      return pagingManager.getPageStore(destination).isPaging();
+      return this.pager;
    }
-   
-   public void messageDone(ServerMessage message) throws Exception
-   {
-      final long size = addSize(message.getDestination(), message.getEncodeSize() * -1);
-      
-      if (size < MAX_SIZE)
-      {
-         PagingStore<PageMessage> store = pagingManager.getPageStore(message.getDestination());
-         
-         startDepageThread(store);
-      }
-   }
-   
-   /** To be called when a rollback is called after messageDone was called */
-   public long addSize(ServerMessage message) throws Exception
-   {
-      return addSize(message.getDestination(), message.getEncodeSize());      
-   }
-   
 
-   public boolean page(ServerMessage message) throws Exception
-   {
-      return pagingManager.getPageStore(message.getDestination()).page(new PageMessage(message));
-   }
 
    public Map<SimpleString, List<Binding>> getMappings()
    {
@@ -355,12 +322,6 @@
    // Private -----------------------------------------------------------------
    
  
-   private long addSize(SimpleString destination, long size)
-   {
-      totalSize.addAndGet(size);
-      return getQueueSize(destination).addAndGet(size);
-   }
-   
    private AtomicLong getQueueSize(SimpleString destination)
    {
       AtomicLong size = this.queueSize.get(destination);
@@ -477,28 +438,169 @@
       
       for (SimpleString destination: dests)
       {
-         PagingStore<PageMessage> store = pagingManager.getPageStore(destination);
+         PagingStore store = pagingManager.getPageStore(destination);
          startDepageThread(store);
       }
    }
 
-   private void startDepageThread(PagingStore<PageMessage> store) throws Exception
+   private void startDepageThread(PagingStore store) throws Exception
    {
-      store.startDequeueThread(new PagingListener());
+      store.startDequeueThread(new PagerImpl());
    }
    
    
-   private class PagingListener implements DepageListener<PageMessage>
+   // TODO this probably will become a separate class?
+   private class PagerImpl implements Pager
    {
+      
+      private final ConcurrentMap</*TransactionID*/ Long , PageTransaction> transactions = new ConcurrentHashMap<Long, PageTransaction>();
 
+      /**
+       * This method will remove files from the page system and add them into the journal, doing it transactionally
+       * 
+       * A Transaction will be opened only if persistent messages are used.
+       * If persistent messages are also used, it will update eventual PageTransactions
+       */
       public boolean onDepage(final SimpleString destination, final PageMessage[] data) throws Exception
       {
+         log.info("Depaging....");
+         long transactionID = storageManager.generateTransactionID();
+         boolean usedTransaction = false;
+         
+         HashSet<PageTransaction> pageTransactionsToUpdate = new HashSet<PageTransaction>();
+         
+         final List<MessageReference> refsToAdd = new ArrayList<MessageReference>();
+         
          for (PageMessage msg: data)
          {
-            routeAndDeliver(msg.getMessage());
+            PageTransaction trUsed = null;
+            if (msg.getTransactionID() > 0)
+            {
+               trUsed = transactions.get(msg.getTransactionID());
+               if (trUsed == null)
+               {
+                  // TODO make it .trace
+                  log.info("Transaction " + msg.getTransactionID() + " not found, ignoring message " + msg.getMessage().getMessageID());
+                  continue;
+               }
+               else if (!trUsed.isCommitted())
+               {
+                  log.info("Transaction " + msg.getTransactionID() + " is pending... we don't know what to do yet... ignoring the message for now but this is not acceptable");
+                  continue;
+               }
+            }
+
+            if (msg.getMessage().isDurable() && trUsed != null)
+            {
+               pageTransactionsToUpdate.add(trUsed);
+               trUsed.decrement();
+            }
+            
+            refsToAdd.addAll(PostOfficeImpl.this.route(msg.getMessage()));
+            
+            if (msg.getMessage().getDurableRefCount() != 0)
+            {
+               usedTransaction = true;
+               storageManager.storeMessageTransactional(transactionID, msg.getMessage());
+            }
          }
+         
+         for (PageTransaction pageTrans: pageTransactionsToUpdate)
+         {
+            if (pageTrans.getSize() == 0)
+            {
+               storageManager.storeDelete(pageTrans.getRecordID());
+            }
+            else
+            {
+               storageManager.updatePageTransaction(transactionID, pageTrans);
+            }
+            usedTransaction = true;
+         }
+         
+         if (usedTransaction)
+         {
+            storageManager.commit(transactionID);
+         }
+         
+         for (MessageReference ref : refsToAdd)
+         {
+            ref.getQueue().addLast(ref);
+         }
+
+         
          return PostOfficeImpl.this.getQueueSize(destination).get() < MAX_SIZE; 
       }
       
+
+      // Transaction
+      public void beginTransaction(long transactionID, PageTransaction pageTransaction)
+      {
+         transactions.putIfAbsent(transactionID, pageTransaction);
+      }
+      
+      public void commitTransaction(long transactionID, PageTransaction pageTransaction)
+      {
+         transactions.putIfAbsent(transactionID, pageTransaction);
+         // TODO: What to do with pending transactions during depage?
+      }
+      
+      public void rollbackTransaction(long transactionID)
+      {
+         transactions.remove(transactionID);
+      }
+      
+      public boolean isPaging(SimpleString destination) throws Exception
+      {
+         return pagingManager.getPageStore(destination).isPaging();
+      }
+      
+      public void messageDone(ServerMessage message) throws Exception
+      {
+         final long size = addSize(message.getDestination(), message.getEncodeSize() * -1);
+
+         if (size < MAX_SIZE)
+         {
+            System.out.println("Starting depage Thread, size = " + size);
+            PagingStore store = pagingManager.getPageStore(message.getDestination());
+            
+            startDepageThread(store);
+         }
+      }
+      
+      private long addSize(SimpleString destination, long size)
+      {
+         totalSize.addAndGet(size);
+         return getQueueSize(destination).addAndGet(size);
+      }
+      
+      /** To be called when a rollback is called after messageDone was called */
+      public long addSize(ServerMessage message) throws Exception
+      {
+         return addSize(message.getDestination(), message.getEncodeSize());      
+      }
+      
+      public boolean page(ServerMessage message) throws Exception
+      {
+         return pagingManager.getPageStore(message.getDestination()).page(new PageMessage(message));
+      }
+
+      public boolean page(ServerMessage message, long transactionID) throws Exception
+      {
+         return pagingManager.getPageStore(message.getDestination()).page(new PageMessage(message, transactionID));
+      }
+
+
+      public void sync(PageTransaction pageTransaction) throws Exception
+      {
+         SimpleString[] destinations = pageTransaction.getDestinations();
+         for (SimpleString destination: destinations)
+         {
+            pagingManager.getPageStore(destination).sync();
+         }
+      }
+      
+      
+
    }
 }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-08-22 04:06:45 UTC (rev 4861)
@@ -172,7 +172,7 @@
       scheduledExecutor = new ScheduledThreadPoolExecutor(configuration.getScheduledThreadPoolMaxSize(), new JBMThreadFactory("JBM-scheduled-threads"));                  
       queueFactory = new QueueFactoryImpl(scheduledExecutor, queueSettingsRepository);
       
-      PagingManager<PageMessage> pagingManager = new PagingManagerImpl<PageMessage>(new PagingManagerFactoryNIO(configuration.getPagingDirectory(), configuration.getPageFileSize()));
+      PagingManager pagingManager = new PagingManagerImpl(new PagingManagerFactoryNIO(configuration.getPagingDirectory(), configuration.getPageFileSize()));
       
       postOffice = new PostOfficeImpl(storageManager, pagingManager, 
             queueFactory, configuration.isRequireDestinations());

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-08-22 04:06:45 UTC (rev 4861)
@@ -39,6 +39,8 @@
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.filter.impl.FilterImpl;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.paging.Pager;
+import org.jboss.messaging.core.paging.impl.PageMessage;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.FlowController;
@@ -128,6 +130,8 @@
    private final ResourceManager resourceManager;
    
    private final PostOffice postOffice;
+   
+   private final Pager pager;
 
    private final SecurityStore securityStore;
    
@@ -161,6 +165,8 @@
       
       this.postOffice = postOffice;
       
+      this.pager = postOffice.getPager();
+      
       this.queueSettingsRepository = queueSettingsRepository;
       
       this.resourceManager = resourceManager;
@@ -306,7 +312,7 @@
       
       if (autoCommitSends)
       {
-         if (!postOffice.page(msg))
+         if (!pager.page(msg))
          {
             List<MessageReference> refs = postOffice.route(msg);
    
@@ -1143,8 +1149,12 @@
       
       if (message.decrementRefCount() == 0)
       {
-         postOffice.messageDone(message);
+         pager.messageDone(message);
       }
+      else
+      {
+         System.out.println("Still " + message.getRefCount());
+      }
       
       if (message.isDurable() && queue.isDurable())
       {

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2008-08-22 04:06:45 UTC (rev 4861)
@@ -22,8 +22,18 @@
 
 package org.jboss.messaging.core.transaction.impl;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import javax.transaction.xa.Xid;
+
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.paging.Pager;
+import org.jboss.messaging.core.paging.PageTransaction;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.server.MessageReference;
@@ -32,11 +42,9 @@
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.core.transaction.Transaction;
-import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.core.paging.impl.PageMessage;
+import org.jboss.messaging.core.paging.impl.PageTransactionImpl;
 
-import javax.transaction.xa.Xid;
-import java.util.*;
-
 /**
  * A TransactionImpl
  *
@@ -46,14 +54,19 @@
 {
    private static final Logger log = Logger.getLogger(TransactionImpl.class);
 
+   
    private final StorageManager storageManager;
 
    private final PostOffice postOffice;
+   
+   private final Pager pager;
 
    private final List<MessageReference> refsToAdd = new ArrayList<MessageReference>();
 
    private final List<MessageReference> acknowledgements = new ArrayList<MessageReference>();
 
+   private PageTransaction pageTransaction; 
+
    private final Xid xid;
 
    private final long id;
@@ -70,6 +83,15 @@
       this.storageManager = storageManager;
 
       this.postOffice = postOffice;
+      
+      if (postOffice == null)
+      {
+         pager = null;
+      }
+      else
+      {
+         this.pager = postOffice.getPager();
+      }
 
       this.xid = null;
 
@@ -82,6 +104,8 @@
       this.storageManager = storageManager;
 
       this.postOffice = postOffice;
+      
+      this.pager = postOffice.getPager();
 
       this.xid = xid;
 
@@ -102,16 +126,31 @@
       {
          throw new IllegalStateException("Transaction is in invalid state " + state);
       }
+      
+      if (pager.page(message, this.id))
+      {
+         if (message.isDurable())
+         {
+            containsPersistent = true;
+            getPageTransaction().addMessage(message.getDestination());
+         }
+         else
+         {
+            getPageTransaction();
+         }
+      }
+      else
+      {
+         List<MessageReference> refs = postOffice.route(message);
 
-      List<MessageReference> refs = postOffice.route(message);
+         refsToAdd.addAll(refs);
 
-      refsToAdd.addAll(refs);
+         if (message.getDurableRefCount() != 0)
+         {
+            storageManager.storeMessageTransactional(id, message);
 
-      if (message.getDurableRefCount() != 0)
-      {
-         storageManager.storeMessageTransactional(id, message);
-
-         containsPersistent = true;
+            containsPersistent = true;
+         }
       }
    }
 
@@ -129,9 +168,9 @@
       
       if (message.decrementRefCount() == 0)
       {
-         if (postOffice != null)
+         if (pager != null)
          {
-            postOffice.messageDone(message);
+            pager.messageDone(message);
          }
       }
 
@@ -183,6 +222,10 @@
 
       if (containsPersistent)
       {
+         if (this.pageTransaction != null)
+         {
+            storageManager.storePageTransaction(this.id, pageTransaction);
+         }
          storageManager.prepare(id);
       }
 
@@ -218,11 +261,25 @@
          }
       }
 
+      
       if (containsPersistent)
       {
+         if (this.pageTransaction != null && state != State.PREPARED)
+         {
+            storageManager.storePageTransaction(this.id, pageTransaction);
+            pager.sync(pageTransaction);
+         }
          storageManager.commit(id);
       }
 
+
+      // TODO: What to do if depage happen on the middle of transaction not committed yet?
+      //       This would be a problem on any solution applied on transactions & paging
+      if (pageTransaction != null)
+      {
+         pager.commitTransaction(id, pageTransaction);
+      }      
+      
       for (MessageReference ref : refsToAdd)
       {
          ref.getQueue().addLast(ref);
@@ -259,6 +316,11 @@
       {
          storageManager.rollback(id);
       }
+      
+      if (pageTransaction != null)
+      {
+         pager.rollbackTransaction(id);
+      }
 
       Map<Queue, LinkedList<MessageReference>> queueMap = new HashMap<Queue, LinkedList<MessageReference>>();
 
@@ -281,7 +343,7 @@
          // Putting back the size to control paging
          if (message.incrementRefCount() == 1)
          {
-            postOffice.addSize(message);
+            pager.addSize(message);
          }
          
          message.incrementRefCount();
@@ -372,6 +434,19 @@
    // Private
    // -------------------------------------------------------------------
 
+   private PageTransaction getPageTransaction()
+   {
+      if (pageTransaction == null)
+      {
+         long pageTRID = storageManager.generateMessageID();
+         pageTransaction = new PageTransactionImpl(id);
+         pageTransaction.setRecordID(pageTRID);
+         pager.beginTransaction(id, pageTransaction);
+      }
+      
+      return pageTransaction;
+   }
+   
    private void clear()
    {
       refsToAdd.clear();

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java	2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java	2008-08-22 04:06:45 UTC (rev 4861)
@@ -60,11 +60,11 @@
    
    public void testPagingManagerNIO() throws Exception
    {
-      PagingManagerImpl<PageMessage> managerImpl = 
-         new PagingManagerImpl<PageMessage>(new PagingManagerFactoryNIO(journalDir, 1024*1024));
+      PagingManagerImpl managerImpl = 
+         new PagingManagerImpl(new PagingManagerFactoryNIO(journalDir, 1024*1024));
       managerImpl.start();
       
-      PagingStore<PageMessage> store = managerImpl.getPageStore(new SimpleString("simple-test"));
+      PagingStore store = managerImpl.getPageStore(new SimpleString("simple-test"));
       
       ServerMessage msg = createMessage(1l, new SimpleString("simple-test"), createRandomBuffer(10));
       
@@ -74,7 +74,7 @@
       
       assertTrue(store.page(new PageMessage(msg)));
       
-      Page<PageMessage> page = store.depage();
+      Page page = store.depage();
       
       page.open();
       

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java	2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java	2008-08-22 04:06:45 UTC (rev 4861)
@@ -28,6 +28,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.jboss.messaging.core.filter.Filter;
+import org.jboss.messaging.core.paging.Pager;
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.FlowController;
 import org.jboss.messaging.core.postoffice.PostOffice;
@@ -161,6 +162,19 @@
    {
       throw new IllegalStateException("Not implemented!");
    }
+
+   public boolean page(ServerMessage message, long transactionID)
+         throws Exception
+   {
+      // TODO Auto-generated method stub
+      return false;
+   }
+
+   public Pager getPager()
+   {
+      // TODO Auto-generated method stub
+      return null;
+   }
    
    
    

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java	2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java	2008-08-22 04:06:45 UTC (rev 4861)
@@ -66,7 +66,7 @@
       
       SequentialFile file = factory.createSequentialFile("00010.page", 1);
       
-      PageImpl<PageMessage> impl = new PageImpl<PageMessage>(factory, file, new FakeManagerFactory(1024), 10);
+      PageImpl impl = new PageImpl(factory, file, new FakeManagerFactory(1024), 10);
       
       assertEquals(10, impl.getPageId());
       
@@ -106,7 +106,7 @@
       
       file = factory.createSequentialFile("00010.page", 1);
       file.open();
-      impl = new PageImpl<PageMessage>(factory, file, new FakeManagerFactory(1024), 10);
+      impl = new PageImpl(factory, file, new FakeManagerFactory(1024), 10);
       
       PageMessage msgs[] = impl.read();
       

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java	2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java	2008-08-22 04:06:45 UTC (rev 4861)
@@ -32,7 +32,6 @@
 import org.jboss.messaging.core.paging.PagingStoreFactory;
 import org.jboss.messaging.core.paging.impl.PagingManagerImpl;
 import org.jboss.messaging.core.paging.impl.PagingStoreImpl;
-import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.tests.util.UnitTestCase;
 import org.jboss.messaging.util.SimpleString;
 
@@ -105,7 +104,7 @@
    
    public void testMultipleThreadsGetStore() throws Exception
    {
-      PagingStoreFactory<ServerMessage> spi = EasyMock.createMock(PagingStoreFactory.class);
+      PagingStoreFactory spi = EasyMock.createMock(PagingStoreFactory.class);
       final PagingManagerImpl manager = new PagingManagerImpl(spi);
       
       final SimpleString destination = new SimpleString("some-destination");
@@ -114,7 +113,7 @@
       
       EasyMock.expect(factory.listFiles(EasyMock.isA(String.class))).andStubReturn(new ArrayList<String>());
       
-      PagingStoreImpl<ServerMessage> storeImpl = new PagingStoreImpl<ServerMessage>(factory, spi, destination, 1);
+      PagingStoreImpl storeImpl = new PagingStoreImpl(factory, spi, destination, 1);
       
       EasyMock.expect(spi.newStore(destination)).andStubReturn(storeImpl);
       

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageTransactionImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageTransactionImplTest.java	2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageTransactionImplTest.java	2008-08-22 04:06:45 UTC (rev 4861)
@@ -55,26 +55,23 @@
    {
       long id1 = RandomUtil.randomLong();
       long id2 = RandomUtil.randomLong();
-      PageTransaction trans = new PageTransactionImpl(id1, id2);
+      PageTransaction trans = new PageTransactionImpl(id2);
       
-      SimpleString dest1 = RandomUtil.randomSimpleString();
-      SimpleString dest2 = RandomUtil.randomSimpleString();
+      trans.setRecordID(id1);
+      
+      // anything between 2 and 100
+      int nr1 = RandomUtil.randomPositiveInt()%98 + 2;
 
-      int nr1 = RandomUtil.randomPositiveInt()%100;
-      int nr2 = RandomUtil.randomPositiveInt()%100;
-
+      SimpleString str1 = RandomUtil.randomSimpleString();
+      SimpleString str2 = RandomUtil.randomSimpleString();
+      
       for (int i = 0; i < nr1; i++)
       {
-         trans.addMessage(dest1);
+         trans.addMessage(i%2 == 0? str1: str2);
       }
       
-      for (int i = 0; i < nr2; i++)
-      {
-         trans.addMessage(dest2);
-      }
       
-      assertEquals(nr1, trans.getSize(dest1));
-      assertEquals(nr2, trans.getSize(dest2));
+      assertEquals(nr1, trans.getSize());
       
       ByteBuffer buffer = ByteBuffer.allocate(trans.getEncodeSize());
       MessagingBuffer wrapper = new ByteBufferWrapper(buffer);
@@ -88,28 +85,24 @@
       assertEquals(id1, trans2.getRecordID());
       assertEquals(id2, trans2.getTransactionID());
       
-      assertEquals(nr1, trans2.getSize(dest1));
-      assertEquals(nr2, trans2.getSize(dest2));
+      assertEquals(nr1, trans2.getSize());
       
-      trans.decrementMessage(dest1, nr1);
-      trans2.decrementMessage(dest1, nr1);
+      trans.decrement(nr1);
       
-      trans.decrementMessage(dest2, nr2);
-      trans2.decrementMessage(dest2, nr2);
+      assertEquals(0, trans.getSize());
       
-      assertTrue(trans.isEmpty());
-      assertTrue(trans.isEmpty());
-      
-      
       try
       {
-         trans.decrementMessage(dest1, 1000);
+         trans.decrement();
          fail("Exception expected!");
       }
       catch (Throwable ignored)
       {
       }
       
+      
+      assertEquals(2, trans.getDestinations().length);
+      
    }
    
    // Package protected ---------------------------------------------

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java	2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java	2008-08-22 04:06:45 UTC (rev 4861)
@@ -33,7 +33,6 @@
 import org.jboss.messaging.core.paging.impl.PageMessage;
 import org.jboss.messaging.core.paging.impl.PagingStoreImpl;
 import org.jboss.messaging.core.paging.impl.TestSupportPageStore;
-import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
 import org.jboss.messaging.tests.unit.core.paging.fakes.FakeManagerFactory;
 import org.jboss.messaging.util.SimpleString;
@@ -95,7 +94,7 @@
       buffers.add(buffer);
       SimpleString destination = new SimpleString("test");
 
-      ServerMessage msg = createMessage(1l, destination, buffer);
+      PageMessage msg = createMessage(1l, destination, buffer);
       
       assertTrue(storeImpl.isPaging());
       
@@ -117,7 +116,7 @@
    {
       SequentialFileFactory factory = new FakeSequentialFileFactory();
       
-      PagingStore<PageMessage> storeImpl = new PagingStoreImpl<PageMessage>(factory, new FakeManagerFactory(0), destinationTestName, 1024 * 10);
+      PagingStore storeImpl = new PagingStoreImpl(factory, new FakeManagerFactory(0), destinationTestName, 1024 * 10);
       
       storeImpl.start();
       
@@ -136,9 +135,9 @@
          
          buffers.add(buffer);
    
-         ServerMessage msg = createMessage(i+1l, destination, buffer);
+         PageMessage msg = createMessage(i+1l, destination, buffer);
 
-         assertTrue(storeImpl.page(new PageMessage(msg, 0l)));
+         assertTrue(storeImpl.page(msg));
       }
       
       
@@ -146,7 +145,7 @@
       
       storeImpl.sync();
       
-      Page<PageMessage> page = storeImpl.depage();
+      Page page = storeImpl.depage();
       
       page.open();
       
@@ -173,7 +172,7 @@
    {
       SequentialFileFactory factory = new FakeSequentialFileFactory();
       
-      TestSupportPageStore<PageMessage> storeImpl = new PagingStoreImpl<PageMessage>(factory, new FakeManagerFactory(0), destinationTestName, 1024 * 10);
+      TestSupportPageStore storeImpl = new PagingStoreImpl(factory, new FakeManagerFactory(0), destinationTestName, 1024 * 10);
       
       storeImpl.start();
       
@@ -200,9 +199,9 @@
          }
          
          
-         ServerMessage msg = createMessage(i+1l, destination, buffer);
+         PageMessage msg = createMessage(i+1l, destination, buffer);
 
-         assertTrue(storeImpl.page(new PageMessage(msg)));
+         assertTrue(storeImpl.page(msg));
       }
       
       
@@ -212,7 +211,7 @@
       
       for (int pageNr = 0; pageNr < 2; pageNr++)
       {
-         Page<PageMessage> page = storeImpl.depage();
+         Page page = storeImpl.depage();
          
          page.open();
          
@@ -233,11 +232,11 @@
       
       assertTrue(storeImpl.isPaging());
 
-      ServerMessage msg = createMessage(100, destination, buffers.get(0));
+      PageMessage msg = createMessage(100, destination, buffers.get(0));
       
-      assertTrue(storeImpl.page(new PageMessage(msg)));
+      assertTrue(storeImpl.page(msg));
       
-      Page<PageMessage> newPage = storeImpl.depage();
+      Page newPage = storeImpl.depage();
       
       newPage.open();
       
@@ -253,13 +252,13 @@
       
       assertFalse(storeImpl.isPaging());
       
-      assertFalse(storeImpl.page(new PageMessage(msg)));
+      assertFalse(storeImpl.page(msg));
       
       storeImpl.startPaging();
 
-      assertTrue(storeImpl.page(new PageMessage(msg)));
+      assertTrue(storeImpl.page(msg));
       
-      Page<PageMessage> page = storeImpl.depage();
+      Page page = storeImpl.depage();
       
       page.open();
       

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java	2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java	2008-08-22 04:06:45 UTC (rev 4861)
@@ -80,9 +80,9 @@
       
       final ConcurrentHashMap<Long, PageMessage> buffers = new ConcurrentHashMap<Long, PageMessage>();
       
-      final ArrayList<Page<PageMessage>> readPages = new ArrayList<Page<PageMessage>>();
+      final ArrayList<Page> readPages = new ArrayList<Page>();
       
-      final TestSupportPageStore<PageMessage> storeImpl = new PagingStoreImpl<PageMessage>(factory, new FakeManagerFactory(0), new SimpleString("test"), MAX_SIZE);
+      final TestSupportPageStore storeImpl = new PagingStoreImpl(factory, new FakeManagerFactory(0), new SimpleString("test"), MAX_SIZE);
       
       storeImpl.start();
       
@@ -111,7 +111,7 @@
                while (true)
                {
                   long id = messageIdGenerator.incrementAndGet();
-                  PageMessage msg = new PageMessage(createMessage(id, destination, createRandomBuffer(5)));
+                  PageMessage msg = createMessage(id, destination, createRandomBuffer(5));
                   if (storeImpl.page(msg))
                   {
                      buffers.put(id, msg);
@@ -198,7 +198,7 @@
       
       final ConcurrentHashMap<Long, PageMessage> buffers2 = new ConcurrentHashMap<Long, PageMessage>();
       
-      for (Page<PageMessage> page: readPages)
+      for (Page page: readPages)
       {
          page.open();
          PageMessage msgs[] = page.read();
@@ -228,7 +228,7 @@
          fileTmp.close();         
       }
       
-      TestSupportPageStore<PageMessage> storeImpl2 = new PagingStoreImpl<PageMessage>(factory, new FakeManagerFactory(0), new SimpleString("test"), MAX_SIZE);
+      TestSupportPageStore storeImpl2 = new PagingStoreImpl(factory, new FakeManagerFactory(0), new SimpleString("test"), MAX_SIZE);
       storeImpl2.start();
       
       int numberOfPages = storeImpl2.getNumberOfPages();
@@ -244,15 +244,15 @@
       assertEquals(numberOfPages, storeImpl2.getNumberOfPages());
       
       long lastMessageId = messageIdGenerator.incrementAndGet();
-      PageMessage lastMsg = new PageMessage(createMessage(lastMessageId, destination, createRandomBuffer(5)));
+      PageMessage lastMsg = createMessage(lastMessageId, destination, createRandomBuffer(5));
       
       storeImpl2.page(lastMsg);
       buffers2.put(lastMessageId, lastMsg);
       
-      Page<PageMessage> lastPage = null;
+      Page lastPage = null;
       while (true)
       {
-         Page<PageMessage> page = storeImpl2.depage();
+         Page page = storeImpl2.depage();
          if (page == null)
          {
             break;
@@ -290,7 +290,7 @@
       
    }
 
-   protected ServerMessage createMessage(long messageId, SimpleString destination, ByteBuffer buffer)
+   protected PageMessage createMessage(long messageId, SimpleString destination, ByteBuffer buffer)
    {
       ServerMessage msg = new ServerMessageImpl((byte)1, true, 0,
             System.currentTimeMillis(), (byte)0, new ByteBufferWrapper(buffer));
@@ -298,7 +298,7 @@
       msg.setMessageID((long)messageId);
       
       msg.setDestination(destination);
-      return msg;
+      return new PageMessage(msg);
    }
 
    protected ByteBuffer createRandomBuffer(int size)




More information about the jboss-cvs-commits mailing list