[jboss-cvs] JBoss Messaging SVN: r4867 - in branches/Branch_JBMESSAGING-1314: src/main/org/jboss/messaging/core/paging and 8 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Aug 25 21:14:35 EDT 2008


Author: clebert.suconic at jboss.com
Date: 2008-08-25 21:14:35 -0400 (Mon, 25 Aug 2008)
New Revision: 4867

Added:
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PageTransactionInfo.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageTransactionInfoImpl.java
Removed:
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PageTransaction.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageTransactionImpl.java
Modified:
   branches/Branch_JBMESSAGING-1314/build-messaging.xml
   branches/Branch_JBMESSAGING-1314/build.xml
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Pager.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/LastPageRecordImpl.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageMessage.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/StorageManager.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageTransactionImplTest.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
Log:
Adding transaction support on paging

Modified: branches/Branch_JBMESSAGING-1314/build-messaging.xml
===================================================================
--- branches/Branch_JBMESSAGING-1314/build-messaging.xml	2008-08-23 05:37:51 UTC (rev 4866)
+++ branches/Branch_JBMESSAGING-1314/build-messaging.xml	2008-08-26 01:14:35 UTC (rev 4867)
@@ -915,6 +915,26 @@
       </java>
    </target>
 
+   <target name="debugServer" depends="jar">
+      <java classname="org.jboss.messaging.microcontainer.JBMBootstrapServer" fork="true">
+         <jvmarg value="-XX:+UseParallelGC"/>
+         <jvmarg value="-Xms512M"/>
+         <jvmarg value="-Xmx2048M"/>
+         <jvmarg value="-XX:+AggressiveOpts"/>
+         <jvmarg value="-XX:+UseFastAccessorMethods"/>
+         <jvmarg value="-Xdebug"/>
+         <jvmarg value="-Xnoagent"/>
+         <jvmarg value="-Djava.compiler=NONE"/>
+         <jvmarg value="-Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000"/>
+         <jvmarg value="-Dorg.jboss.logging.Logger.pluginClass=org.jboss.messaging.core.logging.JBMLoggerPlugin"/>
+         <jvmarg value="-Djava.library.path=${native.bin.dir}"/>
+         <jvmarg value="-Djava.naming.factory.initial=org.jnp.interfaces.NamingContextFactory"/>
+         <jvmarg value="-Djava.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces"/>
+         <arg line="jbm-standalone-beans.xml"/>
+         <classpath refid="jms.test.execution.classpath"/>
+      </java>
+   </target>
+
    <!-- Examples -->
 
    <target name="queueExample" depends="client-jar">

Modified: branches/Branch_JBMESSAGING-1314/build.xml
===================================================================
--- branches/Branch_JBMESSAGING-1314/build.xml	2008-08-23 05:37:51 UTC (rev 4866)
+++ branches/Branch_JBMESSAGING-1314/build.xml	2008-08-26 01:14:35 UTC (rev 4867)
@@ -173,6 +173,10 @@
       <ant antfile="build-messaging.xml" target="runServer"/>
    </target>
 
+   <target name="debugServer" depends="createthirdparty">
+      <ant antfile="build-messaging.xml" target="debugServer"/>
+   </target>
+
    <!--example targets-->
    <target name="queueExample" depends="createthirdparty">
       <ant antfile="build-messaging.xml" target="queueExample"/>

Deleted: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PageTransaction.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PageTransaction.java	2008-08-23 05:37:51 UTC (rev 4866)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PageTransaction.java	2008-08-26 01:14:35 UTC (rev 4867)
@@ -1,57 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.core.paging;
-
-import org.jboss.messaging.core.journal.EncodingSupport;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- * 
- * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
- *
- */
-public interface PageTransaction extends EncodingSupport
-{
-
-   boolean isCommitted();
-   
-   void complete();
-
-   long getRecordID();
-
-   void setRecordID(long id);
-
-   long getTransactionID();
-   
-   int addMessage(SimpleString destination);
-   
-   SimpleString[] getDestinations();
-   
-   int decrement();
-   
-   int decrement(int elements);
-   
-   int getNumberOfMessages();
-
-}

Copied: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PageTransactionInfo.java (from rev 4865, branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PageTransaction.java)
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PageTransactionInfo.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PageTransactionInfo.java	2008-08-26 01:14:35 UTC (rev 4867)
@@ -0,0 +1,57 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.paging;
+
+import org.jboss.messaging.core.journal.EncodingSupport;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * 
+ * This records contains information about tra
+ * 
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public interface PageTransactionInfo extends EncodingSupport
+{
+
+   void waitCompletion() throws InterruptedException;
+   
+   void complete();
+
+   long getRecordID();
+
+   void setRecordID(long id);
+
+   long getTransactionID();
+   
+   int increment();
+   
+   int decrement();
+   
+   int decrement(int elements);
+   
+   int getNumberOfMessages();
+
+}

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Pager.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Pager.java	2008-08-23 05:37:51 UTC (rev 4866)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Pager.java	2008-08-26 01:14:35 UTC (rev 4867)
@@ -32,8 +32,6 @@
 /**
  * 
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
- *
- * @param <T> An Encoding Support.
  */
 public interface Pager
 {
@@ -60,6 +58,22 @@
    boolean page(ServerMessage message) throws Exception;
    
    /**
+    * Page, only if destination is in page mode.
+    * @param message
+    * @return false if destination is not on page mode
+    */
+   boolean page(ServerMessage message, long transactionId) throws Exception;
+   
+   /**
+    * Point to inform/restoring Transactions used when the messages were added into paging
+    * */
+   void addTransaction(PageTransactionInfo pageTransaction);
+   
+   
+   void completeTransaction(long transactionId);
+   
+   
+   /**
     * 
     * Duplication detection for paging processing
     *  */

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java	2008-08-23 05:37:51 UTC (rev 4866)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java	2008-08-26 01:14:35 UTC (rev 4867)
@@ -50,7 +50,7 @@
    
    void sync() throws Exception;
    
-   boolean page(PageMessage message, Pager pageListener) throws Exception;
+   boolean page(PageMessage message) throws Exception;
    
    /** 
     * Remove the first page from the Writing Queue.

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/LastPageRecordImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/LastPageRecordImpl.java	2008-08-23 05:37:51 UTC (rev 4866)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/LastPageRecordImpl.java	2008-08-26 01:14:35 UTC (rev 4867)
@@ -35,7 +35,7 @@
    // Attributes ----------------------------------------------------
    
    
-   private long recordId;
+   private long recordId = 0;
    private SimpleString destination;
    private long lastId;
    
@@ -51,11 +51,9 @@
    {
    }
    
-   public LastPageRecordImpl(long recordId, long lastId,
-         SimpleString destination)
+   public LastPageRecordImpl(long lastId, SimpleString destination)
    {
       super();
-      this.recordId = recordId;
       this.lastId = lastId;
       this.destination = destination;
    }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageMessage.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageMessage.java	2008-08-23 05:37:51 UTC (rev 4866)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageMessage.java	2008-08-26 01:14:35 UTC (rev 4867)
@@ -49,7 +49,14 @@
    // Public --------------------------------------------------------
 
    private final ServerMessage message;
-
+   private long transactionID;
+   
+   public PageMessage(ServerMessage message, long transactionID)
+   {
+      this.message = message;
+      this.transactionID = transactionID;
+   }
+   
    public PageMessage(ServerMessage message)
    {
       this.message = message;
@@ -65,11 +72,17 @@
       return message;
    }
 
+   public long getTransactionID()
+   {
+      return transactionID;
+   }
    
+   
    // EncodingSupport implementation --------------------------------
 
    public void decode(MessagingBuffer buffer)
    {
+      transactionID = buffer.getLong();
       final long messageID = buffer.getLong();
       message.decode(buffer);
       message.setMessageID(messageID);
@@ -77,6 +90,7 @@
 
    public void encode(MessagingBuffer buffer)
    {
+      buffer.putLong(transactionID);
       buffer.putLong(message.getMessageID());
       message.encode(buffer);
    }
@@ -84,7 +98,7 @@
    public int getEncodeSize()
    {
       
-      return 8 + message.getEncodeSize();
+      return 8 + 8 + message.getEncodeSize();
    }
    
    // Package protected ---------------------------------------------

Deleted: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageTransactionImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageTransactionImpl.java	2008-08-23 05:37:51 UTC (rev 4866)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageTransactionImpl.java	2008-08-26 01:14:35 UTC (rev 4867)
@@ -1,167 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.core.paging.impl;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.jboss.messaging.core.paging.PageTransaction;
-import org.jboss.messaging.core.remoting.MessagingBuffer;
-import org.jboss.messaging.util.ConcurrentHashSet;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- * 
- * TODO: delete this class!
- * 
- * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
- *
- */
-public class PageTransactionImpl implements PageTransaction
-{
-
-   // Constants -----------------------------------------------------
-   
-   // Attributes ----------------------------------------------------
-   
-   long transactionID;
-   long recordID;
-   boolean committed = false;
-   
-   final AtomicInteger numberOfMessages = new AtomicInteger(0);
-
-   /** This is a transient field, not being persisted */
-   final ConcurrentHashSet<SimpleString> destinations = new ConcurrentHashSet<SimpleString>();
-   
-   
-   // Static --------------------------------------------------------
-   
-   // Constructors --------------------------------------------------
-   
-   public PageTransactionImpl(final long transactionID)
-   {
-      this.transactionID = transactionID;
-   }
-
-   public PageTransactionImpl()
-   {
-   }
-
-   // Public --------------------------------------------------------
-
-   
-   public long getRecordID()
-   {
-      return recordID;
-   }
-   
-   public void setRecordID(long recordID)
-   {
-      this.recordID = recordID;
-   }
-   
-   public long getTransactionID()
-   {
-      return transactionID;
-   }
-   
-   public int addMessage(SimpleString destination)
-   {
-      this.destinations.add(destination);
-      
-      return numberOfMessages.incrementAndGet();
-   }
-   
-   public int decrement()
-   {
-      final int value = numberOfMessages.decrementAndGet();
-      if (value < 0)
-      {
-         throw new IllegalStateException("Internal error Negative value on Paging transactions!");
-      }
-      
-      return value;
-   }
-   
-   public int decrement(int elements)
-   {
-      final int value = numberOfMessages.addAndGet(elements * -1);
-      if (value < 0)
-      {
-         throw new IllegalStateException("Internal error Negative value on Paging transactions!");
-      }
-      
-      return value;
-   }
-   
-   public int getNumberOfMessages()
-   {
-      return numberOfMessages.get();
-   }
-   
-   public SimpleString[] getDestinations()
-   {
-      return destinations.toArray(new SimpleString[destinations.size()]);
-   }
-   
-   // EncodingSupport implementation 
-   
-   public synchronized void decode(final MessagingBuffer buffer)
-   {
-      this.transactionID = buffer.getLong();
-      this.recordID = buffer.getLong();
-      this.numberOfMessages.set(buffer.getInt());
-   }
-   
-   public synchronized void encode(final MessagingBuffer buffer)
-   {
-      this.committed = true; // if it is being readed, certainly it was committed
-      buffer.putLong(this.transactionID);
-      buffer.putLong(this.recordID);
-      buffer.putInt(this.numberOfMessages.get());
-   }
-
-   public synchronized int getEncodeSize()
-   {
-      return 8*2 /* long */ + 4 /* int */;
-   }
-   
-   public boolean isCommitted()
-   {
-      return committed;
-   }
-   
-   public void complete()
-   {
-      committed = true;
-   }
-   
-   // Package protected ---------------------------------------------
-   
-   // Protected -----------------------------------------------------
-   
-   // Private -------------------------------------------------------
-   
-   // Inner classes -------------------------------------------------
-   
-}

Copied: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageTransactionInfoImpl.java (from rev 4866, branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageTransactionImpl.java)
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageTransactionInfoImpl.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageTransactionInfoImpl.java	2008-08-26 01:14:35 UTC (rev 4867)
@@ -0,0 +1,165 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.paging.impl;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.jboss.messaging.core.paging.PageTransactionInfo;
+import org.jboss.messaging.core.remoting.MessagingBuffer;
+import org.jboss.messaging.util.ConcurrentHashSet;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * 
+ * 
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public class PageTransactionInfoImpl implements PageTransactionInfo
+{
+
+   // Constants -----------------------------------------------------
+   
+   // Attributes ----------------------------------------------------
+   
+   private long transactionID;
+   private long recordID;
+   private CountDownLatch countDownCompleted;
+   
+   final AtomicInteger numberOfMessages = new AtomicInteger(0);
+   
+   
+   // Static --------------------------------------------------------
+   
+   // Constructors --------------------------------------------------
+   
+   public PageTransactionInfoImpl(final long transactionID)
+   {
+      this.transactionID = transactionID;
+      this.countDownCompleted = new CountDownLatch(1);
+   }
+
+   public PageTransactionInfoImpl()
+   {
+   }
+
+   // Public --------------------------------------------------------
+
+   
+   public long getRecordID()
+   {
+      return recordID;
+   }
+   
+   public void setRecordID(long recordID)
+   {
+      this.recordID = recordID;
+   }
+   
+   public long getTransactionID()
+   {
+      return transactionID;
+   }
+   
+   public int increment()
+   {
+      return numberOfMessages.incrementAndGet();
+   }
+   
+   public int decrement()
+   {
+      final int value = numberOfMessages.decrementAndGet();
+      if (value < 0)
+      {
+         throw new IllegalStateException("Internal error Negative value on Paging transactions!");
+      }
+      
+      return value;
+   }
+   
+   public int decrement(int elements)
+   {
+      final int value = numberOfMessages.addAndGet(elements * -1);
+      if (value < 0)
+      {
+         throw new IllegalStateException("Internal error Negative value on Paging transactions!");
+      }
+      
+      return value;
+   }
+   
+   public int getNumberOfMessages()
+   {
+      return numberOfMessages.get();
+   }
+   
+   // EncodingSupport implementation 
+   
+   public synchronized void decode(final MessagingBuffer buffer)
+   {
+      this.transactionID = buffer.getLong();
+      this.numberOfMessages.set(buffer.getInt());
+      this.countDownCompleted = null; // if it is being readed, certainly it was committed
+   }
+   
+   public synchronized void encode(final MessagingBuffer buffer)
+   {
+      buffer.putLong(this.transactionID);
+      buffer.putInt(this.numberOfMessages.get());
+   }
+
+   public synchronized int getEncodeSize()
+   {
+      return 8 /* long */ + 4 /* int */;
+   }
+
+   public void complete()
+   {
+      /** 
+       * this is to avoid a race condition where the transaction still being committed another thread is depaging messages
+       */
+      countDownCompleted.countDown();
+   }
+   
+   /** 
+    * this is to avoid a race condition where the transaction still being committed another thread is depaging messages
+    */
+   public void waitCompletion() throws InterruptedException
+   {
+      if (countDownCompleted != null)
+      {
+         countDownCompleted.await();
+      }
+   }
+   
+   // Package protected ---------------------------------------------
+   
+   // Protected -----------------------------------------------------
+   
+   // Private -------------------------------------------------------
+   
+   // Inner classes -------------------------------------------------
+   
+}

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2008-08-23 05:37:51 UTC (rev 4866)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2008-08-26 01:14:35 UTC (rev 4867)
@@ -198,7 +198,7 @@
       
    }
 
-   public boolean page(PageMessage message, Pager pageListener) throws Exception
+   public boolean page(PageMessage message) throws Exception
    {
       validateInit();
       
@@ -208,19 +208,12 @@
       // This would be a synchronized block... (but using a Semaphore)
       synchronizedBlockLock.acquire();
 
+      // The only thing single-threaded done on paging is allocating the bytes on the file
+      // After we have it allocated we keep all the threads working until we need to move to a new file (in which case we demand a writeLock, to wait for the writes to finish)
       try
       {
          if (currentPage == null)
          {
-            if (this.lastRecord != null)
-            {
-               if (pageListener != null)
-               {
-                  pageListener.clearLastRecord(lastRecord);
-               }
-               lastRecord = null;
-            }
-            
             return false;
          }
          
@@ -287,26 +280,34 @@
    
    public boolean startDequeueThread(final Pager listener) throws Exception
    {
-      if (!isPaging())
+      lock.readLock().lock();
+      try
       {
-         return false;
-      }
-      else
-      {
-         synchronized (this)
+         if (currentPage == null)
          {
-            if (this.dequeueThread == null)
+            return false;
+         }
+         else
+         {
+            synchronized (this)
             {
-               this.dequeueThread = new DequeueThread(listener);
-               this.dequeueThread.start();
-               return true;
+               if (this.dequeueThread == null)
+               {
+                  this.dequeueThread = new DequeueThread(listener);
+                  this.dequeueThread.start();
+                  return true;
+               }
+               else
+               {
+                  return false;
+               }
             }
-            else
-            {
-               return false;
-            }
          }
       }
+      finally
+      {
+         lock.readLock().unlock();
+      }
    }
    
    
@@ -552,6 +553,8 @@
                Page page = depage();
                if (page == null)
                {
+                  listener.clearLastRecord(lastRecord);
+                  lastRecord = null;
                   break;
                }
                page.open();

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/StorageManager.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/StorageManager.java	2008-08-23 05:37:51 UTC (rev 4866)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/StorageManager.java	2008-08-26 01:14:35 UTC (rev 4867)
@@ -26,7 +26,7 @@
 import java.util.Map;
 
 import org.jboss.messaging.core.paging.LastPageRecord;
-import org.jboss.messaging.core.paging.PageTransaction;
+import org.jboss.messaging.core.paging.PageTransactionInfo;
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.server.MessageReference;
@@ -74,10 +74,8 @@
    void rollback(long txID) throws Exception;
       
    
-   void storePageTransaction(long txID, PageTransaction pageTransaction) throws Exception;
+   void storePageTransaction(long txID, PageTransactionInfo pageTransaction) throws Exception;
 
-   void updatePageTransaction(long txID, PageTransaction pageTransaction) throws Exception;
-
    
    void storeLastPage(long txID, LastPageRecord pageTransaction) throws Exception;
    

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2008-08-23 05:37:51 UTC (rev 4866)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2008-08-26 01:14:35 UTC (rev 4867)
@@ -48,8 +48,9 @@
 import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.paging.LastPageRecord;
-import org.jboss.messaging.core.paging.PageTransaction;
+import org.jboss.messaging.core.paging.PageTransactionInfo;
 import org.jboss.messaging.core.paging.impl.LastPageRecordImpl;
+import org.jboss.messaging.core.paging.impl.PageTransactionInfoImpl;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.PostOffice;
@@ -224,19 +225,26 @@
       messageJournal.appendAddRecordTransactional(txID, message.getMessageID(), ADD_MESSAGE, message);
    }
 
-   public void storePageTransaction(long txID, PageTransaction pageTransaction) throws Exception
+   public void storePageTransaction(long txID, PageTransactionInfo pageTransaction) throws Exception
    {
+      if (pageTransaction.getRecordID() != 0)
+      {
+         // Instead of updating the record, we delete the old one as that is better for reclaiming
+         messageJournal.appendDeleteRecordTransactional(txID, pageTransaction.getRecordID());
+      }
+      pageTransaction.setRecordID(generateMessageID());
       messageJournal.appendAddRecordTransactional(txID, pageTransaction.getRecordID(), PAGE_TRANSACTION, pageTransaction);
    }
-
-   public void updatePageTransaction(long txID, PageTransaction pageTransaction) throws Exception
-   {
-      messageJournal.appendUpdateRecordTransactional(txID, pageTransaction.getRecordID(), PAGE_TRANSACTION, pageTransaction);
-   }
    
-   public void storeLastPage(long txID, LastPageRecord pageTransaction) throws Exception
+   public void storeLastPage(long txID, LastPageRecord lastPage) throws Exception
    {
-      messageJournal.appendAddRecordTransactional(txID, pageTransaction.getRecordId(), LAST_PAGE, pageTransaction);
+      if (lastPage.getRecordId() != 0)
+      {
+         // To avoid linked list effect on reclaiming, we delete and add a new record, instead of simply updating it
+         messageJournal.appendDeleteRecordTransactional(txID, lastPage.getRecordId());
+      }
+      lastPage.setRecordId(generateMessageID());
+      messageJournal.appendAddRecordTransactional(txID, lastPage.getRecordId(), LAST_PAGE, lastPage);
    }
 
    public void storeAcknowledgeTransactional(long txID, long queueID, long messageID) throws Exception
@@ -307,6 +315,20 @@
 			
 			switch (recordType)
 			{
+			   case PAGE_TRANSACTION:
+			   {
+               MessagingBuffer buff = new ByteBufferWrapper(bb);
+               
+               PageTransactionInfoImpl pageTransactionInfo = new PageTransactionInfoImpl();
+               
+               pageTransactionInfo.decode(buff);
+               
+               pageTransactionInfo.setRecordID(record.id);
+               
+               postOffice.getPager().addTransaction(pageTransactionInfo);
+
+               break;
+			   }
 			   case LAST_PAGE:
 			   {
                MessagingBuffer buff = new ByteBufferWrapper(bb);

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java	2008-08-23 05:37:51 UTC (rev 4866)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java	2008-08-26 01:14:35 UTC (rev 4867)
@@ -27,7 +27,7 @@
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.jboss.messaging.core.paging.LastPageRecord;
-import org.jboss.messaging.core.paging.PageTransaction;
+import org.jboss.messaging.core.paging.PageTransactionInfo;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.PostOffice;
@@ -119,11 +119,11 @@
 	{
 	}
 
-	public void storePageTransaction(long txID, PageTransaction pageTransaction) throws Exception
+	public void storePageTransaction(long txID, PageTransactionInfo pageTransaction) throws Exception
    {
    }
 
-   public void updatePageTransaction(long txID, PageTransaction pageTransaction) throws Exception
+   public void updatePageTransaction(long txID, PageTransactionInfo pageTransaction) throws Exception
    {
    }
 

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2008-08-23 05:37:51 UTC (rev 4866)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2008-08-26 01:14:35 UTC (rev 4867)
@@ -26,6 +26,7 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -39,6 +40,7 @@
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.paging.LastPageRecord;
+import org.jboss.messaging.core.paging.PageTransactionInfo;
 import org.jboss.messaging.core.paging.Pager;
 import org.jboss.messaging.core.paging.PagingManager;
 import org.jboss.messaging.core.paging.PagingStore;
@@ -447,7 +449,7 @@
 
    private boolean startDepageThread(PagingStore store) throws Exception
    {
-      return store.startDequeueThread(new PagerImpl());
+      return store.startDequeueThread(this.pager);
    }
    
    
@@ -455,7 +457,7 @@
    private class PagerImpl implements Pager
    {
       
-//      private final ConcurrentMap</*TransactionID*/ Long , PageTransaction> transactions = new ConcurrentHashMap<Long, PageTransaction>();
+      private final ConcurrentMap</*TransactionID*/ Long , PageTransactionInfo> transactions = new ConcurrentHashMap<Long, PageTransactionInfo>();
 
       public void clearLastRecord(LastPageRecord lastRecord) throws Exception
       {
@@ -474,41 +476,80 @@
       {
          log.info("Depaging....");
          
-         long transactionID = storageManager.generateTransactionID();
+         /// Depage has to be done atomically, in case of failure it should be back to where it was
+         final long depageTransactionID = storageManager.generateTransactionID();
          
          LastPageRecord lastPage = pagingStore.getLastRecord(); 
          
-         if (lastPage != null)
+         if (lastPage == null)
          {
+            lastPage = new LastPageRecordImpl(pageId, destination);
+            pagingStore.setLastRecord(lastPage);
+         }
+         else
+         {
             if (pageId <= lastPage.getLastId())
             {
-               log.info("Page " + pageId + " was already processed, ignoring the page");
+               log.warn("Page " + pageId + " was already processed, ignoring the page");
                return true;
             }
-            else
-            {
-               storageManager.storeDeleteTransactional(transactionID, lastPage.getRecordId());
-            }
          }
+
+         lastPage.setLastId(pageId);
+         storageManager.storeLastPage(depageTransactionID, lastPage);
          
-         LastPageRecord record = new LastPageRecordImpl(storageManager.generateMessageID(), pageId, destination);
-         storageManager.storeLastPage(transactionID, record);
-         pagingStore.setLastRecord(record);
-         
+         HashSet<PageTransactionInfo> pageTransactionsToUpdate = new HashSet<PageTransactionInfo>();
+
          final List<MessageReference> refsToAdd = new ArrayList<MessageReference>();
          
          for (PageMessage msg: data)
          {
+            final long transactionIdDuringPaging = msg.getTransactionID();
+            if (transactionIdDuringPaging > 0)
+            {
+               final PageTransactionInfo pageInfo = transactions.get(transactionIdDuringPaging);
+               if (pageInfo == null)
+               {
+                  // TODO make it .trace
+                  log.warn("Transaction " + msg.getTransactionID() + " not found, ignoring message " + msg.getMessage().getMessageID(), new Exception("trace"));
+                  continue;
+               }
+               
+               // This is to avoid a race condition where messages are depaged before the commit arrived
+               pageInfo.waitCompletion();
+
+               /// Update information about transactions
+               if (msg.getMessage().isDurable())
+               {
+                  pageInfo.decrement();
+                  pageTransactionsToUpdate.add(pageInfo);
+               }
+            }
+
             refsToAdd.addAll(PostOfficeImpl.this.route(msg.getMessage()));
             
             if (msg.getMessage().getDurableRefCount() != 0)
             {
-               storageManager.storeMessageTransactional(transactionID, msg.getMessage());
+               storageManager.storeMessageTransactional(depageTransactionID, msg.getMessage());
             }
          }
          
-         storageManager.commit(transactionID);
          
+         for (PageTransactionInfo pageWithTransaction: pageTransactionsToUpdate)
+         {
+            if (pageWithTransaction.getNumberOfMessages() == 0)
+            { // no more messages.. delete the PageWithTransactionInfo
+               storageManager.storeDeleteTransactional(depageTransactionID, pageWithTransaction.getRecordID());
+               this.transactions.remove(pageWithTransaction.getTransactionID());
+            }
+            else
+            {
+               storageManager.storePageTransaction(depageTransactionID, pageWithTransaction);
+            }
+         }
+         
+         storageManager.commit(depageTransactionID);
+
          for (MessageReference ref : refsToAdd)
          {
             ref.getQueue().addLast(ref);
@@ -519,6 +560,7 @@
       
       public void loadLastPage(LastPageRecord lastPage) throws Exception
       {
+         System.out.println("LastPage loaded was " + lastPage.getLastId());
          pagingManager.getPageStore(lastPage.getDestination()).setLastRecord(lastPage);
       }
 
@@ -554,11 +596,36 @@
          return addSize(message.getDestination(), message.getEncodeSize());      
       }
       
+      public boolean page(ServerMessage message, long transactionId)
+            throws Exception
+      {
+         return pagingManager.getPageStore(message.getDestination()).page(new PageMessage(message, transactionId));
+      }
+
+
       public boolean page(ServerMessage message) throws Exception
       {
-         return pagingManager.getPageStore(message.getDestination()).page(new PageMessage(message), this);
+         return pagingManager.getPageStore(message.getDestination()).page(new PageMessage(message));
       }
 
+      
+      public void addTransaction(PageTransactionInfo pageTransaction)
+      {
+         this.transactions.put(pageTransaction.getTransactionID(), pageTransaction);
+      }
+
+      public void completeTransaction(long transactionId)
+      {
+         PageTransactionInfo pageTrans = this.transactions.get(transactionId);
+         
+         // If nothing was paged.. we just remove the information to avoid memory leaks
+         if (pageTrans.getNumberOfMessages() == 0)
+         {
+            this.transactions.remove(pageTrans);
+         }
+      }
+
+      
       public void sync(Collection<SimpleString> destinationsToSync) throws Exception
       {
          for (SimpleString destination: destinationsToSync)
@@ -568,6 +635,5 @@
       }
 
 
-
    }
 }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2008-08-23 05:37:51 UTC (rev 4866)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2008-08-26 01:14:35 UTC (rev 4867)
@@ -34,7 +34,7 @@
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.paging.Pager;
-import org.jboss.messaging.core.paging.PageTransaction;
+import org.jboss.messaging.core.paging.PageTransactionInfo;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.server.MessageReference;
@@ -44,7 +44,7 @@
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.core.transaction.Transaction;
 import org.jboss.messaging.core.paging.impl.PageMessage;
-import org.jboss.messaging.core.paging.impl.PageTransactionImpl;
+import org.jboss.messaging.core.paging.impl.PageTransactionInfoImpl;
 import org.jboss.messaging.util.SimpleString;
 
 /**
@@ -68,6 +68,8 @@
    private final List<MessageReference> acknowledgements = new ArrayList<MessageReference>();
    
    private final List<ServerMessage> pagedMessages = new ArrayList<ServerMessage>();
+   
+   private PageTransactionInfoImpl pageTransaction;
 
    private final Xid xid;
 
@@ -131,7 +133,6 @@
       
       if (pager.isPaging(message.getDestination()))
       {
-
          pagedMessages.add(message);
       }
       else
@@ -206,6 +207,8 @@
          throw new IllegalStateException("Cannot prepare non XA transaction");
       }
 
+      pageMessages();
+      
       if (containsPersistent)
       {
          storageManager.prepare(id);
@@ -243,34 +246,22 @@
          }
       }
       
-      HashSet<SimpleString> pagedDestinationsToSync = new HashSet<SimpleString>();
-
-      for (ServerMessage message: pagedMessages)
+      
+      if (state != State.PREPARED)
       {
-       
-         if (pager.page(message))
-         {
-            if (message.isDurable())
-            {
-               pagedDestinationsToSync.add(message.getDestination());
-            }
-         }
-         else
-         {
-            // This could happen when the PageStore left the pageState 
-            route(message);
-         }
+         pageMessages();
       }
-      
+
       if (containsPersistent)
       {
-         if (pagedDestinationsToSync.size() > 0)
-         {
-            pager.sync(pagedDestinationsToSync);
-         }
          storageManager.commit(id);
       }
 
+      if (pageTransaction != null)
+      {
+         pageTransaction.complete();
+      }
+
       for (MessageReference ref : refsToAdd)
       {
          ref.getQueue().addLast(ref);
@@ -434,10 +425,61 @@
       }
    }
 
+   private void pageMessages() throws Exception
+   {
+      HashSet<SimpleString> pagedDestinationsToSync = new HashSet<SimpleString>();
+
+      boolean pagingPersistent = false;
+
+      if (pagedMessages.size() != 0)
+      {
+         if (pageTransaction == null)
+         {
+            pageTransaction = new PageTransactionInfoImpl(this.id);
+            // To avoid a race condition where depage happens before the transaction is completed, we need to inform the pager about this transaction is being processed 
+            pager.addTransaction(pageTransaction);
+         }
+      }
+
+      
+      for (ServerMessage message: pagedMessages)
+      {
+       
+         if (pager.page(message, id))
+         {
+            // This could happen if the destination was in page mode when the message was added, and it was changed when effectively adding it
+            
+            if (message.isDurable())
+            {
+               pageTransaction.increment();
+               pagingPersistent = true;
+               pagedDestinationsToSync.add(message.getDestination());
+            }
+         }
+         else
+         {
+            // This could happen when the PageStore left the pageState 
+            route(message);
+         }
+      }
+      
+      if (pagingPersistent)
+      {
+         containsPersistent = true;
+         if (pagedDestinationsToSync.size() > 0)
+         {
+            pager.sync(pagedDestinationsToSync);
+            storageManager.storePageTransaction(id, pageTransaction);
+         }
+      }
+   }
+
    private void clear()
    {
       refsToAdd.clear();
 
       acknowledgements.clear();
+      
+      pagedMessages.clear();
    }
 }

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java	2008-08-23 05:37:51 UTC (rev 4866)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java	2008-08-26 01:14:35 UTC (rev 4867)
@@ -68,11 +68,11 @@
       
       ServerMessage msg = createMessage(1l, new SimpleString("simple-test"), createRandomBuffer(10));
       
-      assertFalse(store.page(new PageMessage(msg), null));
+      assertFalse(store.page(new PageMessage(msg)));
       
       store.startPaging();
       
-      assertTrue(store.page(new PageMessage(msg), null));
+      assertTrue(store.page(new PageMessage(msg)));
       
       Page page = store.depage();
       
@@ -90,7 +90,7 @@
       
       assertNull(store.depage());
       
-      assertFalse(store.page(new PageMessage(msg), null));
+      assertFalse(store.page(new PageMessage(msg)));
    }
    
    // Package protected ---------------------------------------------

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageTransactionImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageTransactionImplTest.java	2008-08-23 05:37:51 UTC (rev 4866)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageTransactionImplTest.java	2008-08-26 01:14:35 UTC (rev 4867)
@@ -25,8 +25,8 @@
 
 import java.nio.ByteBuffer;
 
-import org.jboss.messaging.core.paging.PageTransaction;
-import org.jboss.messaging.core.paging.impl.PageTransactionImpl;
+import org.jboss.messaging.core.paging.PageTransactionInfo;
+import org.jboss.messaging.core.paging.impl.PageTransactionInfoImpl;
 import org.jboss.messaging.core.remoting.MessagingBuffer;
 import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
 import org.jboss.messaging.tests.util.RandomUtil;
@@ -55,7 +55,7 @@
    {
       long id1 = RandomUtil.randomLong();
       long id2 = RandomUtil.randomLong();
-      PageTransaction trans = new PageTransactionImpl(id2);
+      PageTransactionInfo trans = new PageTransactionInfoImpl(id2);
       
       trans.setRecordID(id1);
       
@@ -67,7 +67,7 @@
       
       for (int i = 0; i < nr1; i++)
       {
-         trans.addMessage(i%2 == 0? str1: str2);
+         trans.increment();
       }
       
       
@@ -79,10 +79,9 @@
       trans.encode(wrapper);
       wrapper.rewind();
       
-      PageTransaction trans2 = new PageTransactionImpl(id1);
+      PageTransactionInfo trans2 = new PageTransactionInfoImpl(id1);
       trans2.decode(wrapper);
       
-      assertEquals(id1, trans2.getRecordID());
       assertEquals(id2, trans2.getTransactionID());
       
       assertEquals(nr1, trans2.getNumberOfMessages());
@@ -101,8 +100,6 @@
       }
       
       
-      assertEquals(2, trans.getDestinations().length);
-      
    }
    
    // Package protected ---------------------------------------------

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java	2008-08-23 05:37:51 UTC (rev 4866)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java	2008-08-26 01:14:35 UTC (rev 4867)
@@ -98,7 +98,7 @@
       
       assertTrue(storeImpl.isPaging());
       
-      assertTrue(storeImpl.page(msg, null));
+      assertTrue(storeImpl.page(msg));
       
       assertEquals(1, storeImpl.getNumberOfPages());
       
@@ -137,7 +137,7 @@
    
          PageMessage msg = createMessage(i+1l, destination, buffer);
 
-         assertTrue(storeImpl.page(msg, null));
+         assertTrue(storeImpl.page(msg));
       }
       
       
@@ -201,7 +201,7 @@
          
          PageMessage msg = createMessage(i+1l, destination, buffer);
 
-         assertTrue(storeImpl.page(msg, null));
+         assertTrue(storeImpl.page(msg));
       }
       
       
@@ -234,7 +234,7 @@
 
       PageMessage msg = createMessage(100, destination, buffers.get(0));
       
-      assertTrue(storeImpl.page(msg, null));
+      assertTrue(storeImpl.page(msg));
       
       Page newPage = storeImpl.depage();
       
@@ -252,11 +252,11 @@
       
       assertFalse(storeImpl.isPaging());
       
-      assertFalse(storeImpl.page(msg, null));
+      assertFalse(storeImpl.page(msg));
       
       storeImpl.startPaging();
 
-      assertTrue(storeImpl.page(msg, null));
+      assertTrue(storeImpl.page(msg));
       
       Page page = storeImpl.depage();
       

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java	2008-08-23 05:37:51 UTC (rev 4866)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java	2008-08-26 01:14:35 UTC (rev 4867)
@@ -112,7 +112,7 @@
                {
                   long id = messageIdGenerator.incrementAndGet();
                   PageMessage msg = createMessage(id, destination, createRandomBuffer(5));
-                  if (storeImpl.page(msg, null))
+                  if (storeImpl.page(msg))
                   {
                      buffers.put(id, msg);
                   }
@@ -246,7 +246,7 @@
       long lastMessageId = messageIdGenerator.incrementAndGet();
       PageMessage lastMsg = createMessage(lastMessageId, destination, createRandomBuffer(5));
       
-      storeImpl2.page(lastMsg, null);
+      storeImpl2.page(lastMsg);
       buffers2.put(lastMessageId, lastMsg);
       
       Page lastPage = null;




More information about the jboss-cvs-commits mailing list