[jboss-cvs] JBoss Messaging SVN: r4860 - in branches/Branch_JBMESSAGING-1314: src/main/org/jboss/messaging/core/journal/impl and 9 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Aug 21 17:42:55 EDT 2008


Author: clebert.suconic at jboss.com
Date: 2008-08-21 17:42:55 -0400 (Thu, 21 Aug 2008)
New Revision: 4860

Added:
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/DepageListener.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageMessage.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/fakes/
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/fakes/FakeManagerFactory.java
Modified:
   branches/Branch_JBMESSAGING-1314/native/bin/libJBMLibAIO64.so
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Page.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingManager.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/TestSupportPageStore.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingStoreIntegrationTest.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
Log:
Making the journal less dependent on Messages 

Modified: branches/Branch_JBMESSAGING-1314/native/bin/libJBMLibAIO64.so
===================================================================
(Binary files differ)

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-08-21 14:55:00 UTC (rev 4859)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-08-21 21:42:55 UTC (rev 4860)
@@ -1964,6 +1964,7 @@
       if (System.currentTimeMillis() - lastTime > 10000)
       {
          System.out.println("Clear!!!" + reuseBuffers.size());
+         lastTime = System.currentTimeMillis();
          reuseBuffers.clear();
       }
       

Added: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/DepageListener.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/DepageListener.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/DepageListener.java	2008-08-21 21:42:55 UTC (rev 4860)
@@ -0,0 +1,55 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.paging;
+
+import org.jboss.messaging.core.journal.EncodingSupport;
+import org.jboss.messaging.util.SimpleString;
+
+public interface DepageListener<T extends EncodingSupport>
+{
+   
+   // Constants -----------------------------------------------------
+   
+   // Attributes ----------------------------------------------------
+   
+   // Static --------------------------------------------------------
+   
+   // Constructors --------------------------------------------------
+   
+   // Public --------------------------------------------------------
+
+   /**
+    * @return false if the listener can't handle more pages
+    */
+   boolean onDepage(SimpleString destination, T[] data) throws Exception;
+   
+   // Package protected ---------------------------------------------
+   
+   // Protected -----------------------------------------------------
+   
+   // Private -------------------------------------------------------
+   
+   // Inner classes -------------------------------------------------
+   
+}

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Page.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Page.java	2008-08-21 14:55:00 UTC (rev 4859)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Page.java	2008-08-21 21:42:55 UTC (rev 4860)
@@ -23,21 +23,25 @@
 
 package org.jboss.messaging.core.paging;
 
-import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.journal.EncodingSupport;
 
 /**
  * 
+ * Page could be used to enqueue files for any Datatype implementing EncodingSupport
+ * 
+ * Paging was written this way as it would be easier to refactor Paging to other datatypes we could require later (such as ACKs, transactions... etc).
+ * 
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
  */
-public interface Page
+public interface Page<T extends EncodingSupport>
 {
    
    int getPageId();
    
-   void write(ServerMessage message) throws Exception;
+   void write(T message) throws Exception;
    
-   ServerMessage[] read() throws Exception;
+   T[] read() throws Exception;
    
    int getSize();
    

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingManager.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingManager.java	2008-08-21 14:55:00 UTC (rev 4859)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingManager.java	2008-08-21 21:42:55 UTC (rev 4860)
@@ -22,6 +22,7 @@
 
 package org.jboss.messaging.core.paging;
 
+import org.jboss.messaging.core.journal.EncodingSupport;
 import org.jboss.messaging.core.server.MessagingComponent;
 import org.jboss.messaging.util.SimpleString;
 
@@ -32,7 +33,7 @@
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  *
  */
-public interface PagingManager extends MessagingComponent
+public interface PagingManager<T extends EncodingSupport> extends MessagingComponent
 {
-   public PagingStore getPageStore(SimpleString storeName) throws Exception;
+   public PagingStore<T> getPageStore(SimpleString storeName) throws Exception;
 }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java	2008-08-21 14:55:00 UTC (rev 4859)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java	2008-08-21 21:42:55 UTC (rev 4860)
@@ -25,8 +25,8 @@
 
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.server.MessagingComponent;
-import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.core.journal.EncodingSupport;
 
 /**
  * 
@@ -37,7 +37,7 @@
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
  */
-public interface PagingStore extends MessagingComponent
+public interface PagingStore<T extends EncodingSupport> extends MessagingComponent
 {
    
    int getNumberOfPages();
@@ -51,7 +51,7 @@
    
    void sync() throws Exception;
    
-   boolean page(ServerMessage message) throws Exception;
+   boolean page(T message) throws Exception;
    
    /** 
     * Remove the first page from the Writing Queue.
@@ -60,7 +60,7 @@
     * @return
     * @throws Exception 
     */
-   Page depage() throws Exception;
+   Page<T> depage() throws Exception;
    
    /**
     * 
@@ -68,6 +68,6 @@
     * @return false if a thread was already started, or if not in page mode
     * @throws Exception 
     */
-   boolean startDequeueThread(PostOffice postOffice, long maxSize) throws Exception;
+   boolean startDequeueThread(DepageListener<T> listener) throws Exception;
 
 }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java	2008-08-21 14:55:00 UTC (rev 4859)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java	2008-08-21 21:42:55 UTC (rev 4860)
@@ -23,16 +23,20 @@
 
 package org.jboss.messaging.core.paging;
 
+import org.jboss.messaging.core.journal.EncodingSupport;
 
+
 /**
  * The integration point between the PagingManger and the File System (aka SequentialFiles)
  * 
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
  */
-public interface PagingStoreFactory
+public interface PagingStoreFactory<T extends EncodingSupport>
 {
 
-   PagingStore newStore(org.jboss.messaging.util.SimpleString destinationName);   
+   PagingStore<T> newStore(org.jboss.messaging.util.SimpleString destinationName);
+   T newElement();
+   T[] newArray(int size);
    
 }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java	2008-08-21 14:55:00 UTC (rev 4859)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java	2008-08-21 21:42:55 UTC (rev 4860)
@@ -27,13 +27,13 @@
 import java.util.ArrayList;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.jboss.messaging.core.journal.EncodingSupport;
 import org.jboss.messaging.core.journal.IOCallback;
 import org.jboss.messaging.core.journal.SequentialFile;
 import org.jboss.messaging.core.journal.SequentialFileFactory;
 import org.jboss.messaging.core.paging.Page;
+import org.jboss.messaging.core.paging.PagingStoreFactory;
 import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
-import org.jboss.messaging.core.server.ServerMessage;
-import org.jboss.messaging.core.server.impl.ServerMessageImpl;
 import org.jboss.messaging.util.VariableLatch;
 
 /**
@@ -41,7 +41,7 @@
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
  */
-public class PageImpl implements Page
+public class PageImpl<T extends EncodingSupport> implements Page<T>
 {
    
    // Constants -----------------------------------------------------
@@ -52,7 +52,7 @@
    
    private static final int SIZE_BYTE = 1;
    
-   public static final int SIZE_RECORD = SIZE_LONG + SIZE_INTEGER + SIZE_BYTE + SIZE_BYTE; 
+   public static final int SIZE_RECORD = SIZE_BYTE + SIZE_INTEGER + SIZE_BYTE; 
    
    public static final byte START_BYTE= (byte)'{';
    public static final byte END_BYTE= (byte)'}';
@@ -60,6 +60,7 @@
    // Attributes ----------------------------------------------------
    
    private final int pageId;
+   private final PagingStoreFactory<T> storeFactory;
    private final AtomicInteger numberOfMessages = new AtomicInteger(0);
    private final SequentialFile file;
    private final SequentialFileFactory fileFactory;
@@ -70,11 +71,12 @@
    
    // Constructors --------------------------------------------------
    
-   public PageImpl(final SequentialFileFactory factory, final SequentialFile file, int pageId) throws Exception
+   public PageImpl(final SequentialFileFactory factory, final SequentialFile file, PagingStoreFactory<T> storeFactory, final int pageId) throws Exception
    {
       this.pageId = pageId;
       this.file = file;
       this.fileFactory = factory;
+      this.storeFactory = storeFactory;
       if (factory.isSupportsCallbacks())
       {
          callback = new PagingCallback();
@@ -97,10 +99,10 @@
       return pageId;
    }
    
-   public ServerMessage[] read() throws Exception
+   public T[] read() throws Exception
    {
       
-      ArrayList<ServerMessage> messages = new ArrayList<ServerMessage>();
+      ArrayList<T> messages = new ArrayList<T>();
 
       ByteBuffer buffer = fileFactory.newBuffer((int)file.size());
       file.position(0);
@@ -122,8 +124,7 @@
                int oldPos = buffer.position();
                if (buffer.position() + messageSize < buffer.limit() && buffer.get(oldPos + messageSize) == END_BYTE)
                {
-                  ServerMessage msg = instantiateObject();
-                  msg.setMessageID(buffer.getLong());
+                  T msg = instantiateObject();
                   msg.decode(messageBuffer);
                   messages.add(msg);
                }
@@ -144,12 +145,11 @@
       return messages.toArray(instantiateArray(messages.size()));
    }
    
-   public void write(final ServerMessage message) throws Exception
+   public void write(final T message) throws Exception
    {
       ByteBuffer buffer = fileFactory.newBuffer(message.getEncodeSize() + SIZE_RECORD);
       buffer.put(START_BYTE);
-      buffer.putInt(message.getEncodeSize() + SIZE_LONG);
-      buffer.putLong(message.getMessageID());
+      buffer.putInt(message.getEncodeSize());
       message.encode(new ByteBufferWrapper(buffer));
       buffer.put(END_BYTE);
       buffer.rewind();
@@ -212,14 +212,15 @@
    
    // Protected -----------------------------------------------------
    
-   protected ServerMessage instantiateObject()
+   protected  T instantiateObject()
    {
-      return new ServerMessageImpl();
+      return storeFactory.newElement();
    }
+
    
-   protected ServerMessage[] instantiateArray(int size)
+   protected T[] instantiateArray(final int size)
    {
-      return new ServerMessageImpl[size];
+      return storeFactory.newArray(size);
    }
    
    // Private -------------------------------------------------------

Added: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageMessage.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageMessage.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageMessage.java	2008-08-21 21:42:55 UTC (rev 4860)
@@ -0,0 +1,111 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.paging.impl;
+
+import org.jboss.messaging.core.journal.EncodingSupport;
+import org.jboss.messaging.core.remoting.MessagingBuffer;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.impl.ServerMessageImpl;
+
+/**
+ * 
+ * This class is used to encapsulate ServerMessage and TransactionID on Paging
+ * 
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public class PageMessage implements EncodingSupport
+{
+   
+   // Constants -----------------------------------------------------
+   
+   // Attributes ----------------------------------------------------
+   
+   // Static --------------------------------------------------------
+   
+   // Constructors --------------------------------------------------
+   
+   // Public --------------------------------------------------------
+
+   private final ServerMessage message;
+   private long transactionID;
+   
+   public PageMessage(ServerMessage message, long transactionID)
+   {
+      this.message = message;
+   }
+   
+   public PageMessage(ServerMessage message)
+   {
+      this.message = message;
+   }
+   
+   public PageMessage()
+   {
+      this(new ServerMessageImpl());
+   }
+   
+   public ServerMessage getMessage()
+   {
+      return message;
+   }
+
+   public long getTransactionID()
+   {
+      return transactionID;
+   }
+   
+   
+   // EncodingSupport implementation --------------------------------
+
+   public void decode(MessagingBuffer buffer)
+   {
+      transactionID = buffer.getLong();
+      final long messageID = buffer.getLong();
+      message.decode(buffer);
+      message.setMessageID(messageID);
+   }
+
+   public void encode(MessagingBuffer buffer)
+   {
+      buffer.putLong(transactionID);
+      buffer.putLong(message.getMessageID());
+      message.encode(buffer);
+   }
+
+   public int getEncodeSize()
+   {
+      
+      return 8 + 8 + message.getEncodeSize();
+   }
+   
+   // Package protected ---------------------------------------------
+   
+   // Protected -----------------------------------------------------
+   
+   // Private -------------------------------------------------------
+   
+   // Inner classes -------------------------------------------------
+   
+}

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java	2008-08-21 14:55:00 UTC (rev 4859)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java	2008-08-21 21:42:55 UTC (rev 4860)
@@ -25,6 +25,7 @@
 
 import java.io.File;
 
+import org.jboss.messaging.core.journal.SequentialFileFactory;
 import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
 import org.jboss.messaging.core.paging.PagingStore;
 import org.jboss.messaging.core.paging.PagingStoreFactory;
@@ -36,7 +37,7 @@
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
  */
-public class PagingManagerFactoryNIO implements PagingStoreFactory
+public class PagingManagerFactoryNIO implements PagingStoreFactory<PageMessage>
 {
    
    // Constants -----------------------------------------------------
@@ -58,19 +59,34 @@
    
    // Public --------------------------------------------------------
 
-   public PagingStore newStore(SimpleString destinationName)
+   public PagingStore<PageMessage> newStore(SimpleString destinationName)
    {
       final String destinationDirectory = directory + "/" + destinationName.toString();
       File destinationFile = new File(destinationDirectory);
       destinationFile.mkdirs();
       
-      return new PagingStoreImpl(new NIOSequentialFileFactory(destinationDirectory), destinationName, pageSize);
+      return new PagingStoreImpl<PageMessage>(newFileFactory(destinationDirectory), this, destinationName, pageSize);
    }
+
+   public PageMessage[] newArray(int size)
+   {
+      return new PageMessage[size];
+   }
+
+   public PageMessage newElement()
+   {
+      return new PageMessage();
+   }
    
    // Package protected ---------------------------------------------
    
    // Protected -----------------------------------------------------
    
+   protected SequentialFileFactory newFileFactory(final String destinationDirectory)
+   {
+      return new NIOSequentialFileFactory(destinationDirectory);
+   }
+
    // Private -------------------------------------------------------
    
    // Inner classes -------------------------------------------------

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2008-08-21 14:55:00 UTC (rev 4859)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2008-08-21 21:42:55 UTC (rev 4860)
@@ -26,6 +26,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import org.jboss.messaging.core.journal.EncodingSupport;
 import org.jboss.messaging.core.paging.PagingManager;
 import org.jboss.messaging.core.paging.PagingStore;
 import org.jboss.messaging.core.paging.PagingStoreFactory;
@@ -36,7 +37,7 @@
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
  */
-public class PagingManagerImpl implements PagingManager
+public class PagingManagerImpl<T extends EncodingSupport> implements PagingManager<T>
 {
    // Constants -----------------------------------------------------
    
@@ -44,32 +45,32 @@
    
    private volatile boolean started = false;
    
-   private final ConcurrentMap<SimpleString, PagingStore> stores = new ConcurrentHashMap<SimpleString, PagingStore>();
+   private final ConcurrentMap<SimpleString, PagingStore<T>> stores = new ConcurrentHashMap<SimpleString, PagingStore<T>>();
    
-   private final PagingStoreFactory pagingSPI;
+   private final PagingStoreFactory<T> pagingSPI;
    
    // Static --------------------------------------------------------
    
    // Constructors --------------------------------------------------
    
-   public PagingManagerImpl(final PagingStoreFactory pagingSPI)
+   public PagingManagerImpl(final PagingStoreFactory<T> pagingSPI)
    {
       this.pagingSPI = pagingSPI;
    }
    
    // Public --------------------------------------------------------
    
-   public PagingStore getPageStore(final SimpleString storeName) throws Exception
+   public PagingStore<T> getPageStore(final SimpleString storeName) throws Exception
    {
       validateStarted();
       
-      PagingStore store = stores.get(storeName);
+      PagingStore<T> store = stores.get(storeName);
       if (store == null)
       {
          
          store = newStore(storeName);
          
-         PagingStore oldStore = stores.putIfAbsent(storeName, store);
+         PagingStore<T> oldStore = stores.putIfAbsent(storeName, store);
          
          if (oldStore != null)
          {
@@ -98,7 +99,7 @@
    {
       this.started = false;
       
-      for (PagingStore store: stores.values())
+      for (PagingStore<T> store: stores.values())
       {
          store.stop();
       }
@@ -111,7 +112,7 @@
    
    // Private -------------------------------------------------------
    
-   private PagingStore newStore(final SimpleString destinationName)
+   private PagingStore<T> newStore(final SimpleString destinationName)
    {
       return pagingSPI.newStore(destinationName);
    }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2008-08-21 14:55:00 UTC (rev 4859)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2008-08-21 21:42:55 UTC (rev 4860)
@@ -30,13 +30,13 @@
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.jboss.messaging.core.journal.EncodingSupport;
 import org.jboss.messaging.core.journal.SequentialFile;
 import org.jboss.messaging.core.journal.SequentialFileFactory;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.paging.DepageListener;
 import org.jboss.messaging.core.paging.Page;
-import org.jboss.messaging.core.paging.PagingStore;
-import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.paging.PagingStoreFactory;
 import org.jboss.messaging.util.SimpleString;
 
 /**
@@ -44,7 +44,7 @@
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
  */
-public class PagingStoreImpl implements PagingStore, TestSupportPageStore
+public class PagingStoreImpl<T extends EncodingSupport> implements TestSupportPageStore<T>
 {
 
    // Constants -----------------------------------------------------
@@ -58,16 +58,19 @@
    
    private final SimpleString storeName;
    
-   private final SequentialFileFactory factory;
+   private final PagingStoreFactory<T> storeFactory;
    
+   private final SequentialFileFactory fileFactory;
+   
    private final long maxPageSize;
    
    private volatile Thread dequeueThread;
    
+   
    private volatile int numberOfPages;
    private volatile int firstPageId = Integer.MAX_VALUE;
    private volatile int currentPageId;
-   private volatile Page currentPage;
+   private volatile Page<T> currentPage;
 
    // This is supposed to perform better than synchronized methods
    private final Semaphore globalLock = new Semaphore(1);
@@ -81,9 +84,10 @@
    // Constructors --------------------------------------------------
    
    
-   public PagingStoreImpl(final SequentialFileFactory factory, final SimpleString storeName, final long maxPageSize) 
+   public PagingStoreImpl(final SequentialFileFactory fileFactory, PagingStoreFactory<T> storeFactory, final SimpleString storeName, final long maxPageSize) 
    {
-      this.factory = factory;
+      this.storeFactory = storeFactory;
+      this.fileFactory = fileFactory;
       this.storeName = storeName;
       this.maxPageSize = maxPageSize;
    }
@@ -117,7 +121,7 @@
       return storeName;
    }
    
-   public Page depage() throws Exception
+   public Page<T> depage() throws Exception
    {
       validateInit();
       
@@ -136,7 +140,7 @@
 
             numberOfPages--;
             
-            final Page returnPage;
+            final Page<T> returnPage;
             if (currentPageId == firstPageId)
             {
                firstPageId = Integer.MAX_VALUE;
@@ -184,11 +188,11 @@
       
    }
 
-   public boolean page(ServerMessage message) throws Exception
+   public boolean page(T message) throws Exception
    {
       validateInit();
       
-      int bytesToWrite = factory.calculateBlockSize(message.getEncodeSize() + PageImpl.SIZE_RECORD);
+      int bytesToWrite = fileFactory.calculateBlockSize(message.getEncodeSize() + PageImpl.SIZE_RECORD);
       
       
       // This would be a synchronized block... (but using a Semaphore)
@@ -262,7 +266,7 @@
       }
    }
    
-   public boolean startDequeueThread(final PostOffice postOffice, final long maxSize) throws Exception
+   public boolean startDequeueThread(final DepageListener<T> listener) throws Exception
    {
       if (!isPaging())
       {
@@ -274,7 +278,7 @@
          {
             if (this.dequeueThread == null)
             {
-               this.dequeueThread = new DequeueThread(postOffice, maxSize);
+               this.dequeueThread = new DequeueThread(listener);
                this.dequeueThread.start();
                return true;
             }
@@ -336,7 +340,7 @@
       try
       {
          
-         List<String> files = factory.listFiles("page");
+         List<String> files = fileFactory.listFiles("page");
          
          numberOfPages = files.size();
          
@@ -449,16 +453,16 @@
    }
 
 
-   private Page createPage(int page) throws Exception
+   private Page<T> createPage(int page) throws Exception
    {
       String fileName = createFileName(page);
-      SequentialFile file = factory.createSequentialFile(fileName, 1000);
+      SequentialFile file = fileFactory.createSequentialFile(fileName, 1000);
       
       file.open();
       
       long size = file.size();
       
-      if (factory.isSupportsCallbacks() && size < maxPageSize)
+      if (fileFactory.isSupportsCallbacks() && size < maxPageSize)
       {
          file.fill((int)size, (int)maxPageSize - (int)size, (byte)0);
       }
@@ -467,7 +471,7 @@
       
       file.close();
       
-      return new PageImpl(factory, file, page);
+      return new PageImpl<T>(fileFactory, file, storeFactory, page);
    }
    
    /**
@@ -499,13 +503,11 @@
    
    class DequeueThread extends Thread
    {
-      final PostOffice postOffice;
-      final long maxSize;
+      final DepageListener<T> listener;
       
-      public DequeueThread(final PostOffice postOffice, final long maxSize)
+      public DequeueThread(final DepageListener<T> listener)
       {
-         this.postOffice = postOffice;
-         this.maxSize = maxSize;
+         this.listener = listener;
       }
       
       
@@ -513,21 +515,20 @@
       {
          try
          {
-            while (postOffice.getSize(storeName) < maxSize)
+            boolean needMorePages = false;
+            do
             {
-               Page page = depage();
+               Page<T> page = depage();
                if (page == null)
                {
                   break;
                }
                page.open();
-               ServerMessage messages[] = page.read();
-               for (ServerMessage message: messages)
-               {
-                  postOffice.routeAndDeliver(message);
-               }
+               T messages[] = page.read();
+               listener.onDepage(PagingStoreImpl.this.storeName, messages);
                page.delete();
             }
+            while (needMorePages);
          }
          catch (Exception e)
          {

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/TestSupportPageStore.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/TestSupportPageStore.java	2008-08-21 14:55:00 UTC (rev 4859)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/TestSupportPageStore.java	2008-08-21 21:42:55 UTC (rev 4860)
@@ -25,12 +25,14 @@
 
 import org.jboss.messaging.core.paging.PagingStore;
 
+import org.jboss.messaging.core.journal.EncodingSupport;
+
 /**
  * All the methods required to TestCases on  PageStoreImpl
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
  */
-public interface TestSupportPageStore extends PagingStore
+public interface TestSupportPageStore<T extends EncodingSupport> extends PagingStore<T>
 {
    void forceAnotherPage() throws Exception;
 }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/PostOffice.java	2008-08-21 14:55:00 UTC (rev 4859)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/PostOffice.java	2008-08-21 21:42:55 UTC (rev 4860)
@@ -109,6 +109,4 @@
    /** To be called when a rollback is called after messageDone was called */
    long addSize(ServerMessage message) throws Exception;
    
-   long getSize(SimpleString destination) throws Exception;
-   
 }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2008-08-21 14:55:00 UTC (rev 4859)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2008-08-21 21:42:55 UTC (rev 4860)
@@ -37,8 +37,10 @@
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.paging.DepageListener;
 import org.jboss.messaging.core.paging.PagingManager;
 import org.jboss.messaging.core.paging.PagingStore;
+import org.jboss.messaging.core.paging.impl.PageMessage;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.FlowController;
@@ -85,12 +87,12 @@
    
    private final StorageManager storageManager;
    
-   private final PagingManager pagingManager;
+   private final PagingManager<PageMessage> pagingManager;
    
    private volatile boolean started;
     
    public PostOfficeImpl(final StorageManager storageManager,
-                         final PagingManager pagingManager,
+                         final PagingManager<PageMessage> pagingManager,
    		                final QueueFactory queueFactory, final boolean checkAllowable)
    {
       this.storageManager = storageManager;
@@ -317,12 +319,9 @@
       
       if (size < MAX_SIZE)
       {
-         PagingStore manager = pagingManager.getPageStore(message.getDestination());
-         if (manager.startDequeueThread(this, MAX_SIZE))
-         {
-            log.info("Starting dequeing page for " + message.getDestination());
-         }
+         PagingStore<PageMessage> store = pagingManager.getPageStore(message.getDestination());
          
+         startDepageThread(store);
       }
    }
    
@@ -335,7 +334,7 @@
 
    public boolean page(ServerMessage message) throws Exception
    {
-      return pagingManager.getPageStore(message.getDestination()).page(message);
+      return pagingManager.getPageStore(message.getDestination()).page(new PageMessage(message));
    }
 
    public Map<SimpleString, List<Binding>> getMappings()
@@ -478,8 +477,28 @@
       
       for (SimpleString destination: dests)
       {
-         PagingStore store = pagingManager.getPageStore(destination);
-         store.startDequeueThread(this, MAX_SIZE);
+         PagingStore<PageMessage> store = pagingManager.getPageStore(destination);
+         startDepageThread(store);
       }
    }
+
+   private void startDepageThread(PagingStore<PageMessage> store) throws Exception
+   {
+      store.startDequeueThread(new PagingListener());
+   }
+   
+   
+   private class PagingListener implements DepageListener<PageMessage>
+   {
+
+      public boolean onDepage(final SimpleString destination, final PageMessage[] data) throws Exception
+      {
+         for (PageMessage msg: data)
+         {
+            routeAndDeliver(msg.getMessage());
+         }
+         return PostOfficeImpl.this.getQueueSize(destination).get() < MAX_SIZE; 
+      }
+      
+   }
 }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-08-21 14:55:00 UTC (rev 4859)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-08-21 21:42:55 UTC (rev 4860)
@@ -35,6 +35,7 @@
 import org.jboss.messaging.core.management.impl.MessagingServerManagementImpl;
 import org.jboss.messaging.core.paging.PagingManager;
 import org.jboss.messaging.core.paging.PagingStoreFactory;
+import org.jboss.messaging.core.paging.impl.PageMessage;
 import org.jboss.messaging.core.paging.impl.PagingManagerFactoryNIO;
 import org.jboss.messaging.core.paging.impl.PagingManagerImpl;
 import org.jboss.messaging.core.persistence.StorageManager;
@@ -171,7 +172,7 @@
       scheduledExecutor = new ScheduledThreadPoolExecutor(configuration.getScheduledThreadPoolMaxSize(), new JBMThreadFactory("JBM-scheduled-threads"));                  
       queueFactory = new QueueFactoryImpl(scheduledExecutor, queueSettingsRepository);
       
-      PagingManager pagingManager = new PagingManagerImpl(new PagingManagerFactoryNIO(configuration.getPagingDirectory(), configuration.getPageFileSize()));
+      PagingManager<PageMessage> pagingManager = new PagingManagerImpl<PageMessage>(new PagingManagerFactoryNIO(configuration.getPagingDirectory(), configuration.getPageFileSize()));
       
       postOffice = new PostOfficeImpl(storageManager, pagingManager, 
             queueFactory, configuration.isRequireDestinations());

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java	2008-08-21 14:55:00 UTC (rev 4859)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java	2008-08-21 21:42:55 UTC (rev 4860)
@@ -28,8 +28,9 @@
 
 import org.jboss.messaging.core.paging.Page;
 import org.jboss.messaging.core.paging.PagingStore;
+import org.jboss.messaging.core.paging.impl.PageMessage;
+import org.jboss.messaging.core.paging.impl.PagingManagerFactoryNIO;
 import org.jboss.messaging.core.paging.impl.PagingManagerImpl;
-import org.jboss.messaging.core.paging.impl.PagingManagerFactoryNIO;
 import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.impl.ServerMessageImpl;
@@ -59,37 +60,37 @@
    
    public void testPagingManagerNIO() throws Exception
    {
-      PagingManagerImpl managerImpl = 
-         new PagingManagerImpl(new PagingManagerFactoryNIO(journalDir, 1024*1024));
+      PagingManagerImpl<PageMessage> managerImpl = 
+         new PagingManagerImpl<PageMessage>(new PagingManagerFactoryNIO(journalDir, 1024*1024));
       managerImpl.start();
       
-      PagingStore store = managerImpl.getPageStore(new SimpleString("simple-test"));
+      PagingStore<PageMessage> store = managerImpl.getPageStore(new SimpleString("simple-test"));
       
       ServerMessage msg = createMessage(1l, new SimpleString("simple-test"), createRandomBuffer(10));
       
-      assertFalse(store.page(msg));
+      assertFalse(store.page(new PageMessage(msg)));
       
       store.startPaging();
       
-      assertTrue(store.page(msg));
+      assertTrue(store.page(new PageMessage(msg)));
       
-      Page page = store.depage();
+      Page<PageMessage> page = store.depage();
       
       page.open();
       
-      ServerMessage msgs[] = page.read();
+      PageMessage msgs[] = page.read();
       
       page.close();
       
       assertEquals(1, msgs.length);
       
-      assertEqualsByteArrays(msg.getBody().array(), msgs[0].getBody().array());
+      assertEqualsByteArrays(msg.getBody().array(), msgs[0].getMessage().getBody().array());
       
       assertTrue(store.isPaging());
       
       assertNull(store.depage());
       
-      assertFalse(store.page(msg));
+      assertFalse(store.page(new PageMessage(msg)));
    }
    
    // Package protected ---------------------------------------------

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingStoreIntegrationTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingStoreIntegrationTest.java	2008-08-21 14:55:00 UTC (rev 4859)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingStoreIntegrationTest.java	2008-08-21 21:42:55 UTC (rev 4860)
@@ -63,7 +63,7 @@
       {
          recreateDirectory();
          System.out.println("Test " + i);
-         testConcurrentPaging(new NIOSequentialFileFactory(journalDir), 10);
+         testConcurrentPaging(new NIOSequentialFileFactory(journalDir), 1);
       }
    }
    

Added: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/fakes/FakeManagerFactory.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/fakes/FakeManagerFactory.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/fakes/FakeManagerFactory.java	2008-08-21 21:42:55 UTC (rev 4860)
@@ -0,0 +1,63 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.unit.core.paging.fakes;
+
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.paging.impl.PagingManagerFactoryNIO;
+import org.jboss.messaging.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
+
+public class FakeManagerFactory extends PagingManagerFactoryNIO
+{
+
+   public FakeManagerFactory(long pageSize)
+   {
+      super("", pageSize);
+   }
+   // Constants -----------------------------------------------------
+   
+   // Attributes ----------------------------------------------------
+   
+   // Static --------------------------------------------------------
+   
+   // Constructors --------------------------------------------------
+   
+   // Public --------------------------------------------------------
+   
+   // Package protected ---------------------------------------------
+   
+   // Protected -----------------------------------------------------
+
+
+   @Override
+   protected SequentialFileFactory newFileFactory(String destinationDirectory)
+   {
+      return new FakeSequentialFileFactory();
+   }
+   
+   
+   // Private -------------------------------------------------------
+   
+   // Inner classes -------------------------------------------------
+   
+}

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java	2008-08-21 14:55:00 UTC (rev 4859)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java	2008-08-21 21:42:55 UTC (rev 4860)
@@ -23,16 +23,6 @@
 
 package org.jboss.messaging.tests.unit.core.paging.impl;
 
-import java.nio.ByteBuffer;
-
-import org.easymock.EasyMock;
-import org.easymock.IAnswer;
-import org.jboss.messaging.core.journal.IOCallback;
-import org.jboss.messaging.core.journal.SequentialFile;
-import org.jboss.messaging.core.journal.SequentialFileFactory;
-import org.jboss.messaging.core.paging.impl.PageImpl;
-import org.jboss.messaging.core.remoting.MessagingBuffer;
-import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
 
 
@@ -66,81 +56,7 @@
    
    
    
-   public void testEasyMockPageWithCallback() throws Exception
-   {
-      testEasyMockOnPage(true);
-   }
-   
-   public void testEasyMockPageWithoutCallback() throws Exception
-   {
-      testEasyMockOnPage(false);
-   }
-   
-   private void testEasyMockOnPage(boolean callback) throws Exception
-   {
-      SequentialFileFactory factory = EasyMock.createMock(SequentialFileFactory.class);
-      
-      EasyMock.expect(factory.isSupportsCallbacks()).andStubReturn(callback);
-      
-      SequentialFile file = EasyMock.createStrictMock(SequentialFile.class);
-      
-      EasyMock.replay(factory, file);
-      
-      PageImpl impl = new PageImpl(factory, file, 1);
-      
-      EasyMock.verify(factory, file);
-      
-      EasyMock.reset(factory, file);
-
-      
-      EasyMock.expect(factory.newBuffer(EasyMock.anyInt())).andStubAnswer(new IAnswer<ByteBuffer>() {
-
-         public ByteBuffer answer() throws Throwable
-         {
-            int size = (Integer)EasyMock.getCurrentArguments()[0];
-            return ByteBuffer.allocate(size);
-         }});
-      
-      ServerMessage msg = EasyMock.createMock(ServerMessage.class);
-      EasyMock.expect(msg.getMessageID()).andStubReturn(1l);
-      EasyMock.expect(msg.getEncodeSize()).andStubReturn(2);
-      msg.encode(EasyMock.isA(MessagingBuffer.class));
-      EasyMock.expectLastCall().andAnswer(new IAnswer<Object>(){
-
-         public Object answer() throws Throwable
-         {
-            MessagingBuffer buffer = (MessagingBuffer)EasyMock.getCurrentArguments()[0];
-            buffer.putByte((byte)5);            
-            buffer.putByte((byte)6);            
-            return null;
-         }});
-      
-      final byte [] expectedBytes = autoEncode((byte)'{', (int)10, (long)1, (byte)5, (byte)6, (byte)'}');
-      
-      if (callback)
-      {
-         EasyMock.expect(file.write(compareByteBuffer(expectedBytes), EasyMock.isA(IOCallback.class))).andAnswer(new IAnswer<Integer>(){
-
-            public Integer answer() throws Throwable
-            {
-               IOCallback callback = (IOCallback)EasyMock.getCurrentArguments()[1];
-               callback.done();
-               return expectedBytes.length;
-            }});
-      }
-      else
-      {
-         EasyMock.expect(file.write(compareByteBuffer(expectedBytes), EasyMock.eq(false))).andReturn(expectedBytes.length);
-      }
-      
-      EasyMock.replay(factory, file, msg);
-      
-      impl.write(msg);
-      
-      EasyMock.verify(factory, file, msg);
-      
-   }
-   
+    
    // Package protected ---------------------------------------------
    
    // Protected -----------------------------------------------------

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java	2008-08-21 14:55:00 UTC (rev 4859)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java	2008-08-21 21:42:55 UTC (rev 4860)
@@ -29,9 +29,11 @@
 import org.jboss.messaging.core.journal.SequentialFile;
 import org.jboss.messaging.core.journal.SequentialFileFactory;
 import org.jboss.messaging.core.paging.impl.PageImpl;
+import org.jboss.messaging.core.paging.impl.PageMessage;
 import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.impl.ServerMessageImpl;
+import org.jboss.messaging.tests.unit.core.paging.fakes.FakeManagerFactory;
 import org.jboss.messaging.tests.util.RandomUtil;
 import org.jboss.messaging.tests.util.UnitTestCase;
 import org.jboss.messaging.util.SimpleString;
@@ -64,7 +66,7 @@
       
       SequentialFile file = factory.createSequentialFile("00010.page", 1);
       
-      PageImpl impl = new PageImpl(factory, file, 10);
+      PageImpl<PageMessage> impl = new PageImpl<PageMessage>(factory, file, new FakeManagerFactory(1024), 10);
       
       assertEquals(10, impl.getPageId());
       
@@ -94,7 +96,7 @@
          
          msg.setDestination(simpleDestination);
          
-         impl.write(msg);
+         impl.write(new PageMessage(msg));
          
          assertEquals(i + 1, impl.getNumberOfMessages());
       }
@@ -104,9 +106,9 @@
       
       file = factory.createSequentialFile("00010.page", 1);
       file.open();
-      impl = new PageImpl(factory, file, 10);
+      impl = new PageImpl<PageMessage>(factory, file, new FakeManagerFactory(1024), 10);
       
-      ServerMessage msgs[] = impl.read();
+      PageMessage msgs[] = impl.read();
       
       assertEquals(numberOfElements, msgs.length);
 
@@ -114,11 +116,11 @@
       
       for (int i = 0; i < msgs.length; i++)
       {
-         assertEquals((long)i, msgs[i].getMessageID());
+         assertEquals((long)i, msgs[i].getMessage().getMessageID());
          
-         assertEquals(simpleDestination, msgs[i].getDestination());
+         assertEquals(simpleDestination, msgs[i].getMessage().getDestination());
          
-         assertEqualsByteArrays(buffers.get(i).array(), msgs[i].getBody().array());
+         assertEqualsByteArrays(buffers.get(i).array(), msgs[i].getMessage().getBody().array());
       }
 
       impl.delete();

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java	2008-08-21 14:55:00 UTC (rev 4859)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java	2008-08-21 21:42:55 UTC (rev 4860)
@@ -32,6 +32,7 @@
 import org.jboss.messaging.core.paging.PagingStoreFactory;
 import org.jboss.messaging.core.paging.impl.PagingManagerImpl;
 import org.jboss.messaging.core.paging.impl.PagingStoreImpl;
+import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.tests.util.UnitTestCase;
 import org.jboss.messaging.util.SimpleString;
 
@@ -104,7 +105,7 @@
    
    public void testMultipleThreadsGetStore() throws Exception
    {
-      PagingStoreFactory spi = EasyMock.createMock(PagingStoreFactory.class);
+      PagingStoreFactory<ServerMessage> spi = EasyMock.createMock(PagingStoreFactory.class);
       final PagingManagerImpl manager = new PagingManagerImpl(spi);
       
       final SimpleString destination = new SimpleString("some-destination");
@@ -113,7 +114,7 @@
       
       EasyMock.expect(factory.listFiles(EasyMock.isA(String.class))).andStubReturn(new ArrayList<String>());
       
-      PagingStoreImpl storeImpl = new PagingStoreImpl(factory, destination, 1);
+      PagingStoreImpl<ServerMessage> storeImpl = new PagingStoreImpl<ServerMessage>(factory, spi, destination, 1);
       
       EasyMock.expect(spi.newStore(destination)).andStubReturn(storeImpl);
       

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java	2008-08-21 14:55:00 UTC (rev 4859)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java	2008-08-21 21:42:55 UTC (rev 4860)
@@ -30,10 +30,12 @@
 import org.jboss.messaging.core.journal.SequentialFileFactory;
 import org.jboss.messaging.core.paging.Page;
 import org.jboss.messaging.core.paging.PagingStore;
+import org.jboss.messaging.core.paging.impl.PageMessage;
 import org.jboss.messaging.core.paging.impl.PagingStoreImpl;
 import org.jboss.messaging.core.paging.impl.TestSupportPageStore;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
+import org.jboss.messaging.tests.unit.core.paging.fakes.FakeManagerFactory;
 import org.jboss.messaging.util.SimpleString;
 
 /**
@@ -60,7 +62,7 @@
    {
       SequentialFileFactory factory = new FakeSequentialFileFactory();
       
-      PagingStore storeImpl = new PagingStoreImpl(factory, destinationTestName, 1024 * 2);
+      PagingStore storeImpl = new PagingStoreImpl(factory, new FakeManagerFactory(0), destinationTestName, 1024 * 2);
       
       storeImpl.start();
       
@@ -76,7 +78,7 @@
    {
       SequentialFileFactory factory = new FakeSequentialFileFactory();
       
-      PagingStore storeImpl = new PagingStoreImpl(factory, destinationTestName, 1024 * 2);
+      PagingStore storeImpl = new PagingStoreImpl(factory, new FakeManagerFactory(0), destinationTestName, 1024 * 2);
       
       storeImpl.start();
       
@@ -103,7 +105,7 @@
       
       storeImpl.sync();
       
-      storeImpl = new PagingStoreImpl(factory, destinationTestName, 1024 * 2);
+      storeImpl = new PagingStoreImpl(factory, new FakeManagerFactory(0), destinationTestName, 1024 * 2);
       
       storeImpl.start();
       
@@ -115,7 +117,7 @@
    {
       SequentialFileFactory factory = new FakeSequentialFileFactory();
       
-      PagingStore storeImpl = new PagingStoreImpl(factory, destinationTestName, 1024 * 10);
+      PagingStore<PageMessage> storeImpl = new PagingStoreImpl<PageMessage>(factory, new FakeManagerFactory(0), destinationTestName, 1024 * 10);
       
       storeImpl.start();
       
@@ -136,7 +138,7 @@
    
          ServerMessage msg = createMessage(i+1l, destination, buffer);
 
-         assertTrue(storeImpl.page(msg));
+         assertTrue(storeImpl.page(new PageMessage(msg, 0l)));
       }
       
       
@@ -144,11 +146,11 @@
       
       storeImpl.sync();
       
-      Page page = storeImpl.depage();
+      Page<PageMessage> page = storeImpl.depage();
       
       page.open();
       
-      ServerMessage msg[] = page.read();
+      PageMessage msg[] = page.read();
       
       assertEquals(10, msg.length);
       assertEquals(1, storeImpl.getNumberOfPages());
@@ -161,8 +163,8 @@
       
       for (int i = 0; i < 10; i++)
       {
-         assertEquals(i + 1l, msg[i].getMessageID());
-         assertEqualsByteArrays(buffers.get(i).array(), msg[i].getBody().array());
+         assertEquals(i + 1l, msg[i].getMessage().getMessageID());
+         assertEqualsByteArrays(buffers.get(i).array(), msg[i].getMessage().getBody().array());
       }
       
    }
@@ -171,7 +173,7 @@
    {
       SequentialFileFactory factory = new FakeSequentialFileFactory();
       
-      TestSupportPageStore storeImpl = new PagingStoreImpl(factory, destinationTestName, 1024 * 10);
+      TestSupportPageStore<PageMessage> storeImpl = new PagingStoreImpl<PageMessage>(factory, new FakeManagerFactory(0), destinationTestName, 1024 * 10);
       
       storeImpl.start();
       
@@ -200,7 +202,7 @@
          
          ServerMessage msg = createMessage(i+1l, destination, buffer);
 
-         assertTrue(storeImpl.page(msg));
+         assertTrue(storeImpl.page(new PageMessage(msg)));
       }
       
       
@@ -210,11 +212,11 @@
       
       for (int pageNr = 0; pageNr < 2; pageNr++)
       {
-         Page page = storeImpl.depage();
+         Page<PageMessage> page = storeImpl.depage();
          
          page.open();
          
-         ServerMessage msg[] = page.read();
+         PageMessage msg[] = page.read();
          
          page.close();
 
@@ -222,8 +224,8 @@
          
          for (int i = 0; i < 5; i++)
          {
-            assertEquals(pageNr*5 + i + 1l, msg[i].getMessageID());
-            assertEqualsByteArrays(buffers.get(pageNr*5 + i).array(), msg[i].getBody().array());
+            assertEquals(pageNr*5 + i + 1l, msg[i].getMessage().getMessageID());
+            assertEqualsByteArrays(buffers.get(pageNr*5 + i).array(), msg[i].getMessage().getBody().array());
          }
       }
       
@@ -233,9 +235,9 @@
 
       ServerMessage msg = createMessage(100, destination, buffers.get(0));
       
-      assertTrue(storeImpl.page(msg));
+      assertTrue(storeImpl.page(new PageMessage(msg)));
       
-      Page newPage = storeImpl.depage();
+      Page<PageMessage> newPage = storeImpl.depage();
       
       newPage.open();
       
@@ -251,23 +253,23 @@
       
       assertFalse(storeImpl.isPaging());
       
-      assertFalse(storeImpl.page(msg));
+      assertFalse(storeImpl.page(new PageMessage(msg)));
       
       storeImpl.startPaging();
 
-      assertTrue(storeImpl.page(msg));
+      assertTrue(storeImpl.page(new PageMessage(msg)));
       
-      Page page = storeImpl.depage();
+      Page<PageMessage> page = storeImpl.depage();
       
       page.open();
       
-      ServerMessage msgs[] = page.read();
+      PageMessage msgs[] = page.read();
       
       assertEquals(1, msgs.length);
       
-      assertEquals(100l, msgs[0].getMessageID());
+      assertEquals(100l, msgs[0].getMessage().getMessageID());
       
-      assertEqualsByteArrays(buffers.get(0).array(), msgs[0].getBody().array());
+      assertEqualsByteArrays(buffers.get(0).array(), msgs[0].getMessage().getBody().array());
       
       assertEquals(1, storeImpl.getNumberOfPages());
       

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java	2008-08-21 14:55:00 UTC (rev 4859)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java	2008-08-21 21:42:55 UTC (rev 4860)
@@ -34,11 +34,13 @@
 import org.jboss.messaging.core.journal.SequentialFile;
 import org.jboss.messaging.core.journal.SequentialFileFactory;
 import org.jboss.messaging.core.paging.Page;
+import org.jboss.messaging.core.paging.impl.PageMessage;
 import org.jboss.messaging.core.paging.impl.PagingStoreImpl;
 import org.jboss.messaging.core.paging.impl.TestSupportPageStore;
 import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.impl.ServerMessageImpl;
+import org.jboss.messaging.tests.unit.core.paging.fakes.FakeManagerFactory;
 import org.jboss.messaging.tests.util.RandomUtil;
 import org.jboss.messaging.tests.util.UnitTestCase;
 import org.jboss.messaging.util.SimpleString;
@@ -76,11 +78,11 @@
       
       final CountDownLatch latchStart = new CountDownLatch(numberOfThreads);
       
-      final ConcurrentHashMap<Long, ServerMessage> buffers = new ConcurrentHashMap<Long, ServerMessage>();
+      final ConcurrentHashMap<Long, PageMessage> buffers = new ConcurrentHashMap<Long, PageMessage>();
       
-      final ArrayList<Page> readPages = new ArrayList<Page>();
+      final ArrayList<Page<PageMessage>> readPages = new ArrayList<Page<PageMessage>>();
       
-      final TestSupportPageStore storeImpl = new PagingStoreImpl(factory, new SimpleString("test"), MAX_SIZE);
+      final TestSupportPageStore<PageMessage> storeImpl = new PagingStoreImpl<PageMessage>(factory, new FakeManagerFactory(0), new SimpleString("test"), MAX_SIZE);
       
       storeImpl.start();
       
@@ -109,7 +111,7 @@
                while (true)
                {
                   long id = messageIdGenerator.incrementAndGet();
-                  ServerMessage msg = createMessage(id, destination, createRandomBuffer(5));
+                  PageMessage msg = new PageMessage(createMessage(id, destination, createRandomBuffer(5)));
                   if (storeImpl.page(msg))
                   {
                      buffers.put(id, msg);
@@ -194,21 +196,21 @@
    
       System.out.println("Reading " + buffers.size() + " messages, " + readPages.size() + " pages");
       
-      final ConcurrentHashMap<Long, ServerMessage> buffers2 = new ConcurrentHashMap<Long, ServerMessage>();
+      final ConcurrentHashMap<Long, PageMessage> buffers2 = new ConcurrentHashMap<Long, PageMessage>();
       
-      for (Page page: readPages)
+      for (Page<PageMessage> page: readPages)
       {
          page.open();
-         ServerMessage msgs[] = page.read();
+         PageMessage msgs[] = page.read();
          page.close();
          
-         for (ServerMessage msg : msgs)
+         for (PageMessage msg : msgs)
          {
-            ServerMessage msgWritten = buffers.remove(msg.getMessageID());
-            buffers2.put(msg.getMessageID(), msg);
+            PageMessage msgWritten = buffers.remove(msg.getMessage().getMessageID());
+            buffers2.put(msg.getMessage().getMessageID(), msg);
             assertNotNull(msgWritten);
-            assertEquals (msg.getDestination(), msgWritten.getDestination());
-            assertEqualsByteArrays(msgWritten.getBody().array(), msg.getBody().array());
+            assertEquals (msg.getMessage().getDestination(), msgWritten.getMessage().getDestination());
+            assertEqualsByteArrays(msgWritten.getMessage().getBody().array(), msg.getMessage().getBody().array());
          }
       }
       
@@ -226,7 +228,7 @@
          fileTmp.close();         
       }
       
-      TestSupportPageStore storeImpl2 = new PagingStoreImpl(factory, new SimpleString("test"), MAX_SIZE);
+      TestSupportPageStore<PageMessage> storeImpl2 = new PagingStoreImpl<PageMessage>(factory, new FakeManagerFactory(0), new SimpleString("test"), MAX_SIZE);
       storeImpl2.start();
       
       int numberOfPages = storeImpl2.getNumberOfPages();
@@ -242,15 +244,15 @@
       assertEquals(numberOfPages, storeImpl2.getNumberOfPages());
       
       long lastMessageId = messageIdGenerator.incrementAndGet();
-      ServerMessage lastMsg = createMessage(lastMessageId, destination, createRandomBuffer(5));
+      PageMessage lastMsg = new PageMessage(createMessage(lastMessageId, destination, createRandomBuffer(5)));
       
       storeImpl2.page(lastMsg);
       buffers2.put(lastMessageId, lastMsg);
       
-      Page lastPage = null;
+      Page<PageMessage> lastPage = null;
       while (true)
       {
-         Page page = storeImpl2.depage();
+         Page<PageMessage> page = storeImpl2.depage();
          if (page == null)
          {
             break;
@@ -260,28 +262,28 @@
          
          page.open();
          
-         ServerMessage[] msgs = page.read();
+         PageMessage[] msgs = page.read();
          
          page.close();
 
-         for (ServerMessage msg: msgs)
+         for (PageMessage msg: msgs)
          {
             
-            ServerMessage msgWritten = buffers2.remove(msg.getMessageID());
+            PageMessage msgWritten = buffers2.remove(msg.getMessage().getMessageID());
             assertNotNull(msgWritten);
-            assertEquals (msg.getDestination(), msgWritten.getDestination());
-            assertEqualsByteArrays(msgWritten.getBody().array(), msg.getBody().array());
+            assertEquals (msg.getMessage().getDestination(), msgWritten.getMessage().getDestination());
+            assertEqualsByteArrays(msgWritten.getMessage().getBody().array(), msg.getMessage().getBody().array());
          }
       }
       
       
       lastPage.open();
-      ServerMessage lastMessages[] = lastPage.read();
+      PageMessage lastMessages[] = lastPage.read();
       lastPage.close();
       assertEquals(1, lastMessages.length);
       
-      assertEquals(lastMessages[0].getMessageID(), lastMessageId);
-      assertEqualsByteArrays(lastMessages[0].getBody().array(), lastMsg.getBody().array());
+      assertEquals(lastMessages[0].getMessage().getMessageID(), lastMessageId);
+      assertEqualsByteArrays(lastMessages[0].getMessage().getBody().array(), lastMsg.getMessage().getBody().array());
       
       assertEquals(0, buffers2.size());
       




More information about the jboss-cvs-commits mailing list