[jboss-cvs] JBoss Messaging SVN: r4867 - in branches/Branch_JBMESSAGING-1314: src/main/org/jboss/messaging/core/paging and 8 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Aug 25 21:14:35 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-08-25 21:14:35 -0400 (Mon, 25 Aug 2008)
New Revision: 4867
Added:
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PageTransactionInfo.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageTransactionInfoImpl.java
Removed:
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PageTransaction.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageTransactionImpl.java
Modified:
branches/Branch_JBMESSAGING-1314/build-messaging.xml
branches/Branch_JBMESSAGING-1314/build.xml
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Pager.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/LastPageRecordImpl.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageMessage.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/StorageManager.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageTransactionImplTest.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
Log:
Adding transaction support on paging
Modified: branches/Branch_JBMESSAGING-1314/build-messaging.xml
===================================================================
--- branches/Branch_JBMESSAGING-1314/build-messaging.xml 2008-08-23 05:37:51 UTC (rev 4866)
+++ branches/Branch_JBMESSAGING-1314/build-messaging.xml 2008-08-26 01:14:35 UTC (rev 4867)
@@ -915,6 +915,26 @@
</java>
</target>
+ <target name="debugServer" depends="jar">
+ <java classname="org.jboss.messaging.microcontainer.JBMBootstrapServer" fork="true">
+ <jvmarg value="-XX:+UseParallelGC"/>
+ <jvmarg value="-Xms512M"/>
+ <jvmarg value="-Xmx2048M"/>
+ <jvmarg value="-XX:+AggressiveOpts"/>
+ <jvmarg value="-XX:+UseFastAccessorMethods"/>
+ <jvmarg value="-Xdebug"/>
+ <jvmarg value="-Xnoagent"/>
+ <jvmarg value="-Djava.compiler=NONE"/>
+ <jvmarg value="-Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000"/>
+ <jvmarg value="-Dorg.jboss.logging.Logger.pluginClass=org.jboss.messaging.core.logging.JBMLoggerPlugin"/>
+ <jvmarg value="-Djava.library.path=${native.bin.dir}"/>
+ <jvmarg value="-Djava.naming.factory.initial=org.jnp.interfaces.NamingContextFactory"/>
+ <jvmarg value="-Djava.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces"/>
+ <arg line="jbm-standalone-beans.xml"/>
+ <classpath refid="jms.test.execution.classpath"/>
+ </java>
+ </target>
+
<!-- Examples -->
<target name="queueExample" depends="client-jar">
Modified: branches/Branch_JBMESSAGING-1314/build.xml
===================================================================
--- branches/Branch_JBMESSAGING-1314/build.xml 2008-08-23 05:37:51 UTC (rev 4866)
+++ branches/Branch_JBMESSAGING-1314/build.xml 2008-08-26 01:14:35 UTC (rev 4867)
@@ -173,6 +173,10 @@
<ant antfile="build-messaging.xml" target="runServer"/>
</target>
+ <target name="debugServer" depends="createthirdparty">
+ <ant antfile="build-messaging.xml" target="debugServer"/>
+ </target>
+
<!--example targets-->
<target name="queueExample" depends="createthirdparty">
<ant antfile="build-messaging.xml" target="queueExample"/>
Deleted: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PageTransaction.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PageTransaction.java 2008-08-23 05:37:51 UTC (rev 4866)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PageTransaction.java 2008-08-26 01:14:35 UTC (rev 4867)
@@ -1,57 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.core.paging;
-
-import org.jboss.messaging.core.journal.EncodingSupport;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- *
- * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
- *
- */
-public interface PageTransaction extends EncodingSupport
-{
-
- boolean isCommitted();
-
- void complete();
-
- long getRecordID();
-
- void setRecordID(long id);
-
- long getTransactionID();
-
- int addMessage(SimpleString destination);
-
- SimpleString[] getDestinations();
-
- int decrement();
-
- int decrement(int elements);
-
- int getNumberOfMessages();
-
-}
Copied: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PageTransactionInfo.java (from rev 4865, branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PageTransaction.java)
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PageTransactionInfo.java (rev 0)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PageTransactionInfo.java 2008-08-26 01:14:35 UTC (rev 4867)
@@ -0,0 +1,57 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.paging;
+
+import org.jboss.messaging.core.journal.EncodingSupport;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ *
+ * This records contains information about tra
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public interface PageTransactionInfo extends EncodingSupport
+{
+
+ void waitCompletion() throws InterruptedException;
+
+ void complete();
+
+ long getRecordID();
+
+ void setRecordID(long id);
+
+ long getTransactionID();
+
+ int increment();
+
+ int decrement();
+
+ int decrement(int elements);
+
+ int getNumberOfMessages();
+
+}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Pager.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Pager.java 2008-08-23 05:37:51 UTC (rev 4866)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Pager.java 2008-08-26 01:14:35 UTC (rev 4867)
@@ -32,8 +32,6 @@
/**
*
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
- *
- * @param <T> An Encoding Support.
*/
public interface Pager
{
@@ -60,6 +58,22 @@
boolean page(ServerMessage message) throws Exception;
/**
+ * Page, only if destination is in page mode.
+ * @param message
+ * @return false if destination is not on page mode
+ */
+ boolean page(ServerMessage message, long transactionId) throws Exception;
+
+ /**
+ * Point to inform/restoring Transactions used when the messages were added into paging
+ * */
+ void addTransaction(PageTransactionInfo pageTransaction);
+
+
+ void completeTransaction(long transactionId);
+
+
+ /**
*
* Duplication detection for paging processing
* */
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java 2008-08-23 05:37:51 UTC (rev 4866)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java 2008-08-26 01:14:35 UTC (rev 4867)
@@ -50,7 +50,7 @@
void sync() throws Exception;
- boolean page(PageMessage message, Pager pageListener) throws Exception;
+ boolean page(PageMessage message) throws Exception;
/**
* Remove the first page from the Writing Queue.
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/LastPageRecordImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/LastPageRecordImpl.java 2008-08-23 05:37:51 UTC (rev 4866)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/LastPageRecordImpl.java 2008-08-26 01:14:35 UTC (rev 4867)
@@ -35,7 +35,7 @@
// Attributes ----------------------------------------------------
- private long recordId;
+ private long recordId = 0;
private SimpleString destination;
private long lastId;
@@ -51,11 +51,9 @@
{
}
- public LastPageRecordImpl(long recordId, long lastId,
- SimpleString destination)
+ public LastPageRecordImpl(long lastId, SimpleString destination)
{
super();
- this.recordId = recordId;
this.lastId = lastId;
this.destination = destination;
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageMessage.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageMessage.java 2008-08-23 05:37:51 UTC (rev 4866)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageMessage.java 2008-08-26 01:14:35 UTC (rev 4867)
@@ -49,7 +49,14 @@
// Public --------------------------------------------------------
private final ServerMessage message;
-
+ private long transactionID;
+
+ public PageMessage(ServerMessage message, long transactionID)
+ {
+ this.message = message;
+ this.transactionID = transactionID;
+ }
+
public PageMessage(ServerMessage message)
{
this.message = message;
@@ -65,11 +72,17 @@
return message;
}
+ public long getTransactionID()
+ {
+ return transactionID;
+ }
+
// EncodingSupport implementation --------------------------------
public void decode(MessagingBuffer buffer)
{
+ transactionID = buffer.getLong();
final long messageID = buffer.getLong();
message.decode(buffer);
message.setMessageID(messageID);
@@ -77,6 +90,7 @@
public void encode(MessagingBuffer buffer)
{
+ buffer.putLong(transactionID);
buffer.putLong(message.getMessageID());
message.encode(buffer);
}
@@ -84,7 +98,7 @@
public int getEncodeSize()
{
- return 8 + message.getEncodeSize();
+ return 8 + 8 + message.getEncodeSize();
}
// Package protected ---------------------------------------------
Deleted: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageTransactionImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageTransactionImpl.java 2008-08-23 05:37:51 UTC (rev 4866)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageTransactionImpl.java 2008-08-26 01:14:35 UTC (rev 4867)
@@ -1,167 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.core.paging.impl;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.jboss.messaging.core.paging.PageTransaction;
-import org.jboss.messaging.core.remoting.MessagingBuffer;
-import org.jboss.messaging.util.ConcurrentHashSet;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- *
- * TODO: delete this class!
- *
- * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
- *
- */
-public class PageTransactionImpl implements PageTransaction
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- long transactionID;
- long recordID;
- boolean committed = false;
-
- final AtomicInteger numberOfMessages = new AtomicInteger(0);
-
- /** This is a transient field, not being persisted */
- final ConcurrentHashSet<SimpleString> destinations = new ConcurrentHashSet<SimpleString>();
-
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public PageTransactionImpl(final long transactionID)
- {
- this.transactionID = transactionID;
- }
-
- public PageTransactionImpl()
- {
- }
-
- // Public --------------------------------------------------------
-
-
- public long getRecordID()
- {
- return recordID;
- }
-
- public void setRecordID(long recordID)
- {
- this.recordID = recordID;
- }
-
- public long getTransactionID()
- {
- return transactionID;
- }
-
- public int addMessage(SimpleString destination)
- {
- this.destinations.add(destination);
-
- return numberOfMessages.incrementAndGet();
- }
-
- public int decrement()
- {
- final int value = numberOfMessages.decrementAndGet();
- if (value < 0)
- {
- throw new IllegalStateException("Internal error Negative value on Paging transactions!");
- }
-
- return value;
- }
-
- public int decrement(int elements)
- {
- final int value = numberOfMessages.addAndGet(elements * -1);
- if (value < 0)
- {
- throw new IllegalStateException("Internal error Negative value on Paging transactions!");
- }
-
- return value;
- }
-
- public int getNumberOfMessages()
- {
- return numberOfMessages.get();
- }
-
- public SimpleString[] getDestinations()
- {
- return destinations.toArray(new SimpleString[destinations.size()]);
- }
-
- // EncodingSupport implementation
-
- public synchronized void decode(final MessagingBuffer buffer)
- {
- this.transactionID = buffer.getLong();
- this.recordID = buffer.getLong();
- this.numberOfMessages.set(buffer.getInt());
- }
-
- public synchronized void encode(final MessagingBuffer buffer)
- {
- this.committed = true; // if it is being readed, certainly it was committed
- buffer.putLong(this.transactionID);
- buffer.putLong(this.recordID);
- buffer.putInt(this.numberOfMessages.get());
- }
-
- public synchronized int getEncodeSize()
- {
- return 8*2 /* long */ + 4 /* int */;
- }
-
- public boolean isCommitted()
- {
- return committed;
- }
-
- public void complete()
- {
- committed = true;
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Copied: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageTransactionInfoImpl.java (from rev 4866, branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageTransactionImpl.java)
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageTransactionInfoImpl.java (rev 0)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageTransactionInfoImpl.java 2008-08-26 01:14:35 UTC (rev 4867)
@@ -0,0 +1,165 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.paging.impl;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.jboss.messaging.core.paging.PageTransactionInfo;
+import org.jboss.messaging.core.remoting.MessagingBuffer;
+import org.jboss.messaging.util.ConcurrentHashSet;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ *
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public class PageTransactionInfoImpl implements PageTransactionInfo
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private long transactionID;
+ private long recordID;
+ private CountDownLatch countDownCompleted;
+
+ final AtomicInteger numberOfMessages = new AtomicInteger(0);
+
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public PageTransactionInfoImpl(final long transactionID)
+ {
+ this.transactionID = transactionID;
+ this.countDownCompleted = new CountDownLatch(1);
+ }
+
+ public PageTransactionInfoImpl()
+ {
+ }
+
+ // Public --------------------------------------------------------
+
+
+ public long getRecordID()
+ {
+ return recordID;
+ }
+
+ public void setRecordID(long recordID)
+ {
+ this.recordID = recordID;
+ }
+
+ public long getTransactionID()
+ {
+ return transactionID;
+ }
+
+ public int increment()
+ {
+ return numberOfMessages.incrementAndGet();
+ }
+
+ public int decrement()
+ {
+ final int value = numberOfMessages.decrementAndGet();
+ if (value < 0)
+ {
+ throw new IllegalStateException("Internal error Negative value on Paging transactions!");
+ }
+
+ return value;
+ }
+
+ public int decrement(int elements)
+ {
+ final int value = numberOfMessages.addAndGet(elements * -1);
+ if (value < 0)
+ {
+ throw new IllegalStateException("Internal error Negative value on Paging transactions!");
+ }
+
+ return value;
+ }
+
+ public int getNumberOfMessages()
+ {
+ return numberOfMessages.get();
+ }
+
+ // EncodingSupport implementation
+
+ public synchronized void decode(final MessagingBuffer buffer)
+ {
+ this.transactionID = buffer.getLong();
+ this.numberOfMessages.set(buffer.getInt());
+ this.countDownCompleted = null; // if it is being readed, certainly it was committed
+ }
+
+ public synchronized void encode(final MessagingBuffer buffer)
+ {
+ buffer.putLong(this.transactionID);
+ buffer.putInt(this.numberOfMessages.get());
+ }
+
+ public synchronized int getEncodeSize()
+ {
+ return 8 /* long */ + 4 /* int */;
+ }
+
+ public void complete()
+ {
+ /**
+ * this is to avoid a race condition where the transaction still being committed another thread is depaging messages
+ */
+ countDownCompleted.countDown();
+ }
+
+ /**
+ * this is to avoid a race condition where the transaction still being committed another thread is depaging messages
+ */
+ public void waitCompletion() throws InterruptedException
+ {
+ if (countDownCompleted != null)
+ {
+ countDownCompleted.await();
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2008-08-23 05:37:51 UTC (rev 4866)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2008-08-26 01:14:35 UTC (rev 4867)
@@ -198,7 +198,7 @@
}
- public boolean page(PageMessage message, Pager pageListener) throws Exception
+ public boolean page(PageMessage message) throws Exception
{
validateInit();
@@ -208,19 +208,12 @@
// This would be a synchronized block... (but using a Semaphore)
synchronizedBlockLock.acquire();
+ // The only thing single-threaded done on paging is allocating the bytes on the file
+ // After we have it allocated we keep all the threads working until we need to move to a new file (in which case we demand a writeLock, to wait for the writes to finish)
try
{
if (currentPage == null)
{
- if (this.lastRecord != null)
- {
- if (pageListener != null)
- {
- pageListener.clearLastRecord(lastRecord);
- }
- lastRecord = null;
- }
-
return false;
}
@@ -287,26 +280,34 @@
public boolean startDequeueThread(final Pager listener) throws Exception
{
- if (!isPaging())
+ lock.readLock().lock();
+ try
{
- return false;
- }
- else
- {
- synchronized (this)
+ if (currentPage == null)
{
- if (this.dequeueThread == null)
+ return false;
+ }
+ else
+ {
+ synchronized (this)
{
- this.dequeueThread = new DequeueThread(listener);
- this.dequeueThread.start();
- return true;
+ if (this.dequeueThread == null)
+ {
+ this.dequeueThread = new DequeueThread(listener);
+ this.dequeueThread.start();
+ return true;
+ }
+ else
+ {
+ return false;
+ }
}
- else
- {
- return false;
- }
}
}
+ finally
+ {
+ lock.readLock().unlock();
+ }
}
@@ -552,6 +553,8 @@
Page page = depage();
if (page == null)
{
+ listener.clearLastRecord(lastRecord);
+ lastRecord = null;
break;
}
page.open();
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/StorageManager.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/StorageManager.java 2008-08-23 05:37:51 UTC (rev 4866)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/StorageManager.java 2008-08-26 01:14:35 UTC (rev 4867)
@@ -26,7 +26,7 @@
import java.util.Map;
import org.jboss.messaging.core.paging.LastPageRecord;
-import org.jboss.messaging.core.paging.PageTransaction;
+import org.jboss.messaging.core.paging.PageTransactionInfo;
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.server.MessageReference;
@@ -74,10 +74,8 @@
void rollback(long txID) throws Exception;
- void storePageTransaction(long txID, PageTransaction pageTransaction) throws Exception;
+ void storePageTransaction(long txID, PageTransactionInfo pageTransaction) throws Exception;
- void updatePageTransaction(long txID, PageTransaction pageTransaction) throws Exception;
-
void storeLastPage(long txID, LastPageRecord pageTransaction) throws Exception;
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-08-23 05:37:51 UTC (rev 4866)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-08-26 01:14:35 UTC (rev 4867)
@@ -48,8 +48,9 @@
import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.paging.LastPageRecord;
-import org.jboss.messaging.core.paging.PageTransaction;
+import org.jboss.messaging.core.paging.PageTransactionInfo;
import org.jboss.messaging.core.paging.impl.LastPageRecordImpl;
+import org.jboss.messaging.core.paging.impl.PageTransactionInfoImpl;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.PostOffice;
@@ -224,19 +225,26 @@
messageJournal.appendAddRecordTransactional(txID, message.getMessageID(), ADD_MESSAGE, message);
}
- public void storePageTransaction(long txID, PageTransaction pageTransaction) throws Exception
+ public void storePageTransaction(long txID, PageTransactionInfo pageTransaction) throws Exception
{
+ if (pageTransaction.getRecordID() != 0)
+ {
+ // Instead of updating the record, we delete the old one as that is better for reclaiming
+ messageJournal.appendDeleteRecordTransactional(txID, pageTransaction.getRecordID());
+ }
+ pageTransaction.setRecordID(generateMessageID());
messageJournal.appendAddRecordTransactional(txID, pageTransaction.getRecordID(), PAGE_TRANSACTION, pageTransaction);
}
-
- public void updatePageTransaction(long txID, PageTransaction pageTransaction) throws Exception
- {
- messageJournal.appendUpdateRecordTransactional(txID, pageTransaction.getRecordID(), PAGE_TRANSACTION, pageTransaction);
- }
- public void storeLastPage(long txID, LastPageRecord pageTransaction) throws Exception
+ public void storeLastPage(long txID, LastPageRecord lastPage) throws Exception
{
- messageJournal.appendAddRecordTransactional(txID, pageTransaction.getRecordId(), LAST_PAGE, pageTransaction);
+ if (lastPage.getRecordId() != 0)
+ {
+ // To avoid linked list effect on reclaiming, we delete and add a new record, instead of simply updating it
+ messageJournal.appendDeleteRecordTransactional(txID, lastPage.getRecordId());
+ }
+ lastPage.setRecordId(generateMessageID());
+ messageJournal.appendAddRecordTransactional(txID, lastPage.getRecordId(), LAST_PAGE, lastPage);
}
public void storeAcknowledgeTransactional(long txID, long queueID, long messageID) throws Exception
@@ -307,6 +315,20 @@
switch (recordType)
{
+ case PAGE_TRANSACTION:
+ {
+ MessagingBuffer buff = new ByteBufferWrapper(bb);
+
+ PageTransactionInfoImpl pageTransactionInfo = new PageTransactionInfoImpl();
+
+ pageTransactionInfo.decode(buff);
+
+ pageTransactionInfo.setRecordID(record.id);
+
+ postOffice.getPager().addTransaction(pageTransactionInfo);
+
+ break;
+ }
case LAST_PAGE:
{
MessagingBuffer buff = new ByteBufferWrapper(bb);
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java 2008-08-23 05:37:51 UTC (rev 4866)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java 2008-08-26 01:14:35 UTC (rev 4867)
@@ -27,7 +27,7 @@
import java.util.concurrent.atomic.AtomicLong;
import org.jboss.messaging.core.paging.LastPageRecord;
-import org.jboss.messaging.core.paging.PageTransaction;
+import org.jboss.messaging.core.paging.PageTransactionInfo;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.PostOffice;
@@ -119,11 +119,11 @@
{
}
- public void storePageTransaction(long txID, PageTransaction pageTransaction) throws Exception
+ public void storePageTransaction(long txID, PageTransactionInfo pageTransaction) throws Exception
{
}
- public void updatePageTransaction(long txID, PageTransaction pageTransaction) throws Exception
+ public void updatePageTransaction(long txID, PageTransactionInfo pageTransaction) throws Exception
{
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2008-08-23 05:37:51 UTC (rev 4866)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2008-08-26 01:14:35 UTC (rev 4867)
@@ -26,6 +26,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -39,6 +40,7 @@
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.paging.LastPageRecord;
+import org.jboss.messaging.core.paging.PageTransactionInfo;
import org.jboss.messaging.core.paging.Pager;
import org.jboss.messaging.core.paging.PagingManager;
import org.jboss.messaging.core.paging.PagingStore;
@@ -447,7 +449,7 @@
private boolean startDepageThread(PagingStore store) throws Exception
{
- return store.startDequeueThread(new PagerImpl());
+ return store.startDequeueThread(this.pager);
}
@@ -455,7 +457,7 @@
private class PagerImpl implements Pager
{
-// private final ConcurrentMap</*TransactionID*/ Long , PageTransaction> transactions = new ConcurrentHashMap<Long, PageTransaction>();
+ private final ConcurrentMap</*TransactionID*/ Long , PageTransactionInfo> transactions = new ConcurrentHashMap<Long, PageTransactionInfo>();
public void clearLastRecord(LastPageRecord lastRecord) throws Exception
{
@@ -474,41 +476,80 @@
{
log.info("Depaging....");
- long transactionID = storageManager.generateTransactionID();
+ /// Depage has to be done atomically, in case of failure it should be back to where it was
+ final long depageTransactionID = storageManager.generateTransactionID();
LastPageRecord lastPage = pagingStore.getLastRecord();
- if (lastPage != null)
+ if (lastPage == null)
{
+ lastPage = new LastPageRecordImpl(pageId, destination);
+ pagingStore.setLastRecord(lastPage);
+ }
+ else
+ {
if (pageId <= lastPage.getLastId())
{
- log.info("Page " + pageId + " was already processed, ignoring the page");
+ log.warn("Page " + pageId + " was already processed, ignoring the page");
return true;
}
- else
- {
- storageManager.storeDeleteTransactional(transactionID, lastPage.getRecordId());
- }
}
+
+ lastPage.setLastId(pageId);
+ storageManager.storeLastPage(depageTransactionID, lastPage);
- LastPageRecord record = new LastPageRecordImpl(storageManager.generateMessageID(), pageId, destination);
- storageManager.storeLastPage(transactionID, record);
- pagingStore.setLastRecord(record);
-
+ HashSet<PageTransactionInfo> pageTransactionsToUpdate = new HashSet<PageTransactionInfo>();
+
final List<MessageReference> refsToAdd = new ArrayList<MessageReference>();
for (PageMessage msg: data)
{
+ final long transactionIdDuringPaging = msg.getTransactionID();
+ if (transactionIdDuringPaging > 0)
+ {
+ final PageTransactionInfo pageInfo = transactions.get(transactionIdDuringPaging);
+ if (pageInfo == null)
+ {
+ // TODO make it .trace
+ log.warn("Transaction " + msg.getTransactionID() + " not found, ignoring message " + msg.getMessage().getMessageID(), new Exception("trace"));
+ continue;
+ }
+
+ // This is to avoid a race condition where messages are depaged before the commit arrived
+ pageInfo.waitCompletion();
+
+ /// Update information about transactions
+ if (msg.getMessage().isDurable())
+ {
+ pageInfo.decrement();
+ pageTransactionsToUpdate.add(pageInfo);
+ }
+ }
+
refsToAdd.addAll(PostOfficeImpl.this.route(msg.getMessage()));
if (msg.getMessage().getDurableRefCount() != 0)
{
- storageManager.storeMessageTransactional(transactionID, msg.getMessage());
+ storageManager.storeMessageTransactional(depageTransactionID, msg.getMessage());
}
}
- storageManager.commit(transactionID);
+ for (PageTransactionInfo pageWithTransaction: pageTransactionsToUpdate)
+ {
+ if (pageWithTransaction.getNumberOfMessages() == 0)
+ { // no more messages.. delete the PageWithTransactionInfo
+ storageManager.storeDeleteTransactional(depageTransactionID, pageWithTransaction.getRecordID());
+ this.transactions.remove(pageWithTransaction.getTransactionID());
+ }
+ else
+ {
+ storageManager.storePageTransaction(depageTransactionID, pageWithTransaction);
+ }
+ }
+
+ storageManager.commit(depageTransactionID);
+
for (MessageReference ref : refsToAdd)
{
ref.getQueue().addLast(ref);
@@ -519,6 +560,7 @@
public void loadLastPage(LastPageRecord lastPage) throws Exception
{
+ System.out.println("LastPage loaded was " + lastPage.getLastId());
pagingManager.getPageStore(lastPage.getDestination()).setLastRecord(lastPage);
}
@@ -554,11 +596,36 @@
return addSize(message.getDestination(), message.getEncodeSize());
}
+ public boolean page(ServerMessage message, long transactionId)
+ throws Exception
+ {
+ return pagingManager.getPageStore(message.getDestination()).page(new PageMessage(message, transactionId));
+ }
+
+
public boolean page(ServerMessage message) throws Exception
{
- return pagingManager.getPageStore(message.getDestination()).page(new PageMessage(message), this);
+ return pagingManager.getPageStore(message.getDestination()).page(new PageMessage(message));
}
+
+ public void addTransaction(PageTransactionInfo pageTransaction)
+ {
+ this.transactions.put(pageTransaction.getTransactionID(), pageTransaction);
+ }
+
+ public void completeTransaction(long transactionId)
+ {
+ PageTransactionInfo pageTrans = this.transactions.get(transactionId);
+
+ // If nothing was paged.. we just remove the information to avoid memory leaks
+ if (pageTrans.getNumberOfMessages() == 0)
+ {
+ this.transactions.remove(pageTrans);
+ }
+ }
+
+
public void sync(Collection<SimpleString> destinationsToSync) throws Exception
{
for (SimpleString destination: destinationsToSync)
@@ -568,6 +635,5 @@
}
-
}
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-08-23 05:37:51 UTC (rev 4866)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-08-26 01:14:35 UTC (rev 4867)
@@ -34,7 +34,7 @@
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.paging.Pager;
-import org.jboss.messaging.core.paging.PageTransaction;
+import org.jboss.messaging.core.paging.PageTransactionInfo;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.server.MessageReference;
@@ -44,7 +44,7 @@
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.core.transaction.Transaction;
import org.jboss.messaging.core.paging.impl.PageMessage;
-import org.jboss.messaging.core.paging.impl.PageTransactionImpl;
+import org.jboss.messaging.core.paging.impl.PageTransactionInfoImpl;
import org.jboss.messaging.util.SimpleString;
/**
@@ -68,6 +68,8 @@
private final List<MessageReference> acknowledgements = new ArrayList<MessageReference>();
private final List<ServerMessage> pagedMessages = new ArrayList<ServerMessage>();
+
+ private PageTransactionInfoImpl pageTransaction;
private final Xid xid;
@@ -131,7 +133,6 @@
if (pager.isPaging(message.getDestination()))
{
-
pagedMessages.add(message);
}
else
@@ -206,6 +207,8 @@
throw new IllegalStateException("Cannot prepare non XA transaction");
}
+ pageMessages();
+
if (containsPersistent)
{
storageManager.prepare(id);
@@ -243,34 +246,22 @@
}
}
- HashSet<SimpleString> pagedDestinationsToSync = new HashSet<SimpleString>();
-
- for (ServerMessage message: pagedMessages)
+
+ if (state != State.PREPARED)
{
-
- if (pager.page(message))
- {
- if (message.isDurable())
- {
- pagedDestinationsToSync.add(message.getDestination());
- }
- }
- else
- {
- // This could happen when the PageStore left the pageState
- route(message);
- }
+ pageMessages();
}
-
+
if (containsPersistent)
{
- if (pagedDestinationsToSync.size() > 0)
- {
- pager.sync(pagedDestinationsToSync);
- }
storageManager.commit(id);
}
+ if (pageTransaction != null)
+ {
+ pageTransaction.complete();
+ }
+
for (MessageReference ref : refsToAdd)
{
ref.getQueue().addLast(ref);
@@ -434,10 +425,61 @@
}
}
+ private void pageMessages() throws Exception
+ {
+ HashSet<SimpleString> pagedDestinationsToSync = new HashSet<SimpleString>();
+
+ boolean pagingPersistent = false;
+
+ if (pagedMessages.size() != 0)
+ {
+ if (pageTransaction == null)
+ {
+ pageTransaction = new PageTransactionInfoImpl(this.id);
+ // To avoid a race condition where depage happens before the transaction is completed, we need to inform the pager about this transaction is being processed
+ pager.addTransaction(pageTransaction);
+ }
+ }
+
+
+ for (ServerMessage message: pagedMessages)
+ {
+
+ if (pager.page(message, id))
+ {
+ // This could happen if the destination was in page mode when the message was added, and it was changed when effectively adding it
+
+ if (message.isDurable())
+ {
+ pageTransaction.increment();
+ pagingPersistent = true;
+ pagedDestinationsToSync.add(message.getDestination());
+ }
+ }
+ else
+ {
+ // This could happen when the PageStore left the pageState
+ route(message);
+ }
+ }
+
+ if (pagingPersistent)
+ {
+ containsPersistent = true;
+ if (pagedDestinationsToSync.size() > 0)
+ {
+ pager.sync(pagedDestinationsToSync);
+ storageManager.storePageTransaction(id, pageTransaction);
+ }
+ }
+ }
+
private void clear()
{
refsToAdd.clear();
acknowledgements.clear();
+
+ pagedMessages.clear();
}
}
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java 2008-08-23 05:37:51 UTC (rev 4866)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java 2008-08-26 01:14:35 UTC (rev 4867)
@@ -68,11 +68,11 @@
ServerMessage msg = createMessage(1l, new SimpleString("simple-test"), createRandomBuffer(10));
- assertFalse(store.page(new PageMessage(msg), null));
+ assertFalse(store.page(new PageMessage(msg)));
store.startPaging();
- assertTrue(store.page(new PageMessage(msg), null));
+ assertTrue(store.page(new PageMessage(msg)));
Page page = store.depage();
@@ -90,7 +90,7 @@
assertNull(store.depage());
- assertFalse(store.page(new PageMessage(msg), null));
+ assertFalse(store.page(new PageMessage(msg)));
}
// Package protected ---------------------------------------------
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageTransactionImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageTransactionImplTest.java 2008-08-23 05:37:51 UTC (rev 4866)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageTransactionImplTest.java 2008-08-26 01:14:35 UTC (rev 4867)
@@ -25,8 +25,8 @@
import java.nio.ByteBuffer;
-import org.jboss.messaging.core.paging.PageTransaction;
-import org.jboss.messaging.core.paging.impl.PageTransactionImpl;
+import org.jboss.messaging.core.paging.PageTransactionInfo;
+import org.jboss.messaging.core.paging.impl.PageTransactionInfoImpl;
import org.jboss.messaging.core.remoting.MessagingBuffer;
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
import org.jboss.messaging.tests.util.RandomUtil;
@@ -55,7 +55,7 @@
{
long id1 = RandomUtil.randomLong();
long id2 = RandomUtil.randomLong();
- PageTransaction trans = new PageTransactionImpl(id2);
+ PageTransactionInfo trans = new PageTransactionInfoImpl(id2);
trans.setRecordID(id1);
@@ -67,7 +67,7 @@
for (int i = 0; i < nr1; i++)
{
- trans.addMessage(i%2 == 0? str1: str2);
+ trans.increment();
}
@@ -79,10 +79,9 @@
trans.encode(wrapper);
wrapper.rewind();
- PageTransaction trans2 = new PageTransactionImpl(id1);
+ PageTransactionInfo trans2 = new PageTransactionInfoImpl(id1);
trans2.decode(wrapper);
- assertEquals(id1, trans2.getRecordID());
assertEquals(id2, trans2.getTransactionID());
assertEquals(nr1, trans2.getNumberOfMessages());
@@ -101,8 +100,6 @@
}
- assertEquals(2, trans.getDestinations().length);
-
}
// Package protected ---------------------------------------------
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java 2008-08-23 05:37:51 UTC (rev 4866)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java 2008-08-26 01:14:35 UTC (rev 4867)
@@ -98,7 +98,7 @@
assertTrue(storeImpl.isPaging());
- assertTrue(storeImpl.page(msg, null));
+ assertTrue(storeImpl.page(msg));
assertEquals(1, storeImpl.getNumberOfPages());
@@ -137,7 +137,7 @@
PageMessage msg = createMessage(i+1l, destination, buffer);
- assertTrue(storeImpl.page(msg, null));
+ assertTrue(storeImpl.page(msg));
}
@@ -201,7 +201,7 @@
PageMessage msg = createMessage(i+1l, destination, buffer);
- assertTrue(storeImpl.page(msg, null));
+ assertTrue(storeImpl.page(msg));
}
@@ -234,7 +234,7 @@
PageMessage msg = createMessage(100, destination, buffers.get(0));
- assertTrue(storeImpl.page(msg, null));
+ assertTrue(storeImpl.page(msg));
Page newPage = storeImpl.depage();
@@ -252,11 +252,11 @@
assertFalse(storeImpl.isPaging());
- assertFalse(storeImpl.page(msg, null));
+ assertFalse(storeImpl.page(msg));
storeImpl.startPaging();
- assertTrue(storeImpl.page(msg, null));
+ assertTrue(storeImpl.page(msg));
Page page = storeImpl.depage();
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java 2008-08-23 05:37:51 UTC (rev 4866)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java 2008-08-26 01:14:35 UTC (rev 4867)
@@ -112,7 +112,7 @@
{
long id = messageIdGenerator.incrementAndGet();
PageMessage msg = createMessage(id, destination, createRandomBuffer(5));
- if (storeImpl.page(msg, null))
+ if (storeImpl.page(msg))
{
buffers.put(id, msg);
}
@@ -246,7 +246,7 @@
long lastMessageId = messageIdGenerator.incrementAndGet();
PageMessage lastMsg = createMessage(lastMessageId, destination, createRandomBuffer(5));
- storeImpl2.page(lastMsg, null);
+ storeImpl2.page(lastMsg);
buffers2.put(lastMessageId, lastMsg);
Page lastPage = null;
More information about the jboss-cvs-commits
mailing list