[jboss-cvs] JBoss Messaging SVN: r4861 - in branches/Branch_JBMESSAGING-1314: src/main/org/jboss/messaging/core/paging/impl and 10 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Aug 22 00:06:45 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-08-22 00:06:45 -0400 (Fri, 22 Aug 2008)
New Revision: 4861
Added:
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Pager.java
Removed:
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/DepageListener.java
Modified:
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Page.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PageTransaction.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingManager.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageTransactionImpl.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/TestSupportPageStore.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/StorageManager.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageTransactionImplTest.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
Log:
First attempt on transactions and paging
Deleted: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/DepageListener.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/DepageListener.java 2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/DepageListener.java 2008-08-22 04:06:45 UTC (rev 4861)
@@ -1,55 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.core.paging;
-
-import org.jboss.messaging.core.journal.EncodingSupport;
-import org.jboss.messaging.util.SimpleString;
-
-public interface DepageListener<T extends EncodingSupport>
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- /**
- * @return false if the listener can't handle more pages
- */
- boolean onDepage(SimpleString destination, T[] data) throws Exception;
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Page.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Page.java 2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Page.java 2008-08-22 04:06:45 UTC (rev 4861)
@@ -23,7 +23,7 @@
package org.jboss.messaging.core.paging;
-import org.jboss.messaging.core.journal.EncodingSupport;
+import org.jboss.messaging.core.paging.impl.PageMessage;
/**
*
@@ -34,14 +34,14 @@
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
*/
-public interface Page<T extends EncodingSupport>
+public interface Page
{
int getPageId();
- void write(T message) throws Exception;
+ void write(PageMessage message) throws Exception;
- T[] read() throws Exception;
+ PageMessage[] read() throws Exception;
int getSize();
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PageTransaction.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PageTransaction.java 2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PageTransaction.java 2008-08-22 04:06:45 UTC (rev 4861)
@@ -34,18 +34,24 @@
public interface PageTransaction extends EncodingSupport
{
+ boolean isCommitted();
+
+ void complete();
+
long getRecordID();
+ void setRecordID(long id);
+
long getTransactionID();
- void setTransactionID(long transactionID);
+ int addMessage(SimpleString destination);
- void addMessage(SimpleString destination);
+ SimpleString[] getDestinations();
- void decrementMessage(SimpleString destination, int numberOfMessages);
+ int decrement();
- int getSize(SimpleString destination);
+ int decrement(int elements);
- boolean isEmpty();
+ int getSize();
}
Added: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Pager.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Pager.java (rev 0)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Pager.java 2008-08-22 04:06:45 UTC (rev 4861)
@@ -0,0 +1,102 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.paging;
+
+import org.jboss.messaging.core.paging.impl.PageMessage;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ * @param <T> An Encoding Support.
+ * TODO: After we have the Paging system stable, maybe we can remove the generic part
+ */
+public interface Pager
+{
+
+ /**
+ * @return false if the listener can't handle more pages
+ */
+ boolean onDepage(SimpleString destination, PageMessage[] data) throws Exception;
+
+ /**
+ * Depage could depage messages for transactions not committed yet,
+ * So we need to eventually send pending messages
+ * */
+ void beginTransaction(long transactionID, PageTransaction pageTransaction);
+
+ /**
+ * Depage could depage messages for transactions not committed yet,
+ * So we need to eventually send pending messages
+ * */
+ void commitTransaction(long transactionID, PageTransaction pageTransaction);
+
+ /**
+ * Need to clear any information about the transaction, any eventual data will be ignored
+ * */
+ void rollbackTransaction(long transactionID);
+
+
+
+ /**
+ * To be used by transactions only.
+ * If you're sure you will page if isPaging, just call the method page and look at its return.
+ * @param destination
+ * @return
+ */
+ boolean isPaging(SimpleString destination) throws Exception;
+
+ /**
+ * Page, only if destination is in page mode.
+ * @param message
+ * @return false if destination is not on page mode
+ */
+ boolean page(ServerMessage message) throws Exception;
+
+ /**
+ * Page, only if destination is in page mode.
+ *
+ * page is an atomic operation. It's better to call page and get test the return.
+ *
+ * @param message
+ * @return false if destination is not on page mode
+ */
+ boolean page(ServerMessage message, long transactionID) throws Exception;
+
+ /**
+ *
+ * To be called when there are no more references to the message
+ * @param message
+ */
+ void messageDone(ServerMessage message) throws Exception;
+
+ /** To be called when a rollback is called after messageDone was called */
+ long addSize(ServerMessage message) throws Exception;
+
+ void sync(PageTransaction pageTransaction) throws Exception;
+
+
+}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingManager.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingManager.java 2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingManager.java 2008-08-22 04:06:45 UTC (rev 4861)
@@ -22,7 +22,6 @@
package org.jboss.messaging.core.paging;
-import org.jboss.messaging.core.journal.EncodingSupport;
import org.jboss.messaging.core.server.MessagingComponent;
import org.jboss.messaging.util.SimpleString;
@@ -33,7 +32,7 @@
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
*/
-public interface PagingManager<T extends EncodingSupport> extends MessagingComponent
+public interface PagingManager extends MessagingComponent
{
- public PagingStore<T> getPageStore(SimpleString storeName) throws Exception;
+ public PagingStore getPageStore(SimpleString storeName) throws Exception;
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java 2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java 2008-08-22 04:06:45 UTC (rev 4861)
@@ -23,10 +23,9 @@
package org.jboss.messaging.core.paging;
-import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.paging.impl.PageMessage;
import org.jboss.messaging.core.server.MessagingComponent;
import org.jboss.messaging.util.SimpleString;
-import org.jboss.messaging.core.journal.EncodingSupport;
/**
*
@@ -37,7 +36,7 @@
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
*/
-public interface PagingStore<T extends EncodingSupport> extends MessagingComponent
+public interface PagingStore extends MessagingComponent
{
int getNumberOfPages();
@@ -51,7 +50,7 @@
void sync() throws Exception;
- boolean page(T message) throws Exception;
+ boolean page(PageMessage message) throws Exception;
/**
* Remove the first page from the Writing Queue.
@@ -60,7 +59,7 @@
* @return
* @throws Exception
*/
- Page<T> depage() throws Exception;
+ Page depage() throws Exception;
/**
*
@@ -68,6 +67,6 @@
* @return false if a thread was already started, or if not in page mode
* @throws Exception
*/
- boolean startDequeueThread(DepageListener<T> listener) throws Exception;
+ boolean startDequeueThread(Pager listener) throws Exception;
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java 2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java 2008-08-22 04:06:45 UTC (rev 4861)
@@ -23,7 +23,7 @@
package org.jboss.messaging.core.paging;
-import org.jboss.messaging.core.journal.EncodingSupport;
+import org.jboss.messaging.core.paging.impl.PageMessage;
/**
@@ -32,11 +32,11 @@
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
*/
-public interface PagingStoreFactory<T extends EncodingSupport>
+public interface PagingStoreFactory
{
- PagingStore<T> newStore(org.jboss.messaging.util.SimpleString destinationName);
- T newElement();
- T[] newArray(int size);
+ PagingStore newStore(org.jboss.messaging.util.SimpleString destinationName);
+ PageMessage newElement();
+ PageMessage[] newArray(int size);
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java 2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java 2008-08-22 04:06:45 UTC (rev 4861)
@@ -27,7 +27,6 @@
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;
-import org.jboss.messaging.core.journal.EncodingSupport;
import org.jboss.messaging.core.journal.IOCallback;
import org.jboss.messaging.core.journal.SequentialFile;
import org.jboss.messaging.core.journal.SequentialFileFactory;
@@ -41,7 +40,7 @@
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
*/
-public class PageImpl<T extends EncodingSupport> implements Page<T>
+public class PageImpl implements Page
{
// Constants -----------------------------------------------------
@@ -60,7 +59,7 @@
// Attributes ----------------------------------------------------
private final int pageId;
- private final PagingStoreFactory<T> storeFactory;
+ private final PagingStoreFactory storeFactory;
private final AtomicInteger numberOfMessages = new AtomicInteger(0);
private final SequentialFile file;
private final SequentialFileFactory fileFactory;
@@ -71,7 +70,7 @@
// Constructors --------------------------------------------------
- public PageImpl(final SequentialFileFactory factory, final SequentialFile file, PagingStoreFactory<T> storeFactory, final int pageId) throws Exception
+ public PageImpl(final SequentialFileFactory factory, final SequentialFile file, PagingStoreFactory storeFactory, final int pageId) throws Exception
{
this.pageId = pageId;
this.file = file;
@@ -99,10 +98,10 @@
return pageId;
}
- public T[] read() throws Exception
+ public PageMessage[] read() throws Exception
{
- ArrayList<T> messages = new ArrayList<T>();
+ ArrayList<PageMessage> messages = new ArrayList<PageMessage>();
ByteBuffer buffer = fileFactory.newBuffer((int)file.size());
file.position(0);
@@ -124,7 +123,7 @@
int oldPos = buffer.position();
if (buffer.position() + messageSize < buffer.limit() && buffer.get(oldPos + messageSize) == END_BYTE)
{
- T msg = instantiateObject();
+ PageMessage msg = instantiateObject();
msg.decode(messageBuffer);
messages.add(msg);
}
@@ -145,7 +144,7 @@
return messages.toArray(instantiateArray(messages.size()));
}
- public void write(final T message) throws Exception
+ public void write(final PageMessage message) throws Exception
{
ByteBuffer buffer = fileFactory.newBuffer(message.getEncodeSize() + SIZE_RECORD);
buffer.put(START_BYTE);
@@ -212,13 +211,13 @@
// Protected -----------------------------------------------------
- protected T instantiateObject()
+ protected PageMessage instantiateObject()
{
return storeFactory.newElement();
}
- protected T[] instantiateArray(final int size)
+ protected PageMessage[] instantiateArray(final int size)
{
return storeFactory.newArray(size);
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageTransactionImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageTransactionImpl.java 2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageTransactionImpl.java 2008-08-22 04:06:45 UTC (rev 4861)
@@ -23,12 +23,11 @@
package org.jboss.messaging.core.paging.impl;
-import java.util.HashMap;
-import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.messaging.core.paging.PageTransaction;
import org.jboss.messaging.core.remoting.MessagingBuffer;
+import org.jboss.messaging.util.ConcurrentHashSet;
import org.jboss.messaging.util.SimpleString;
/**
@@ -43,28 +42,29 @@
// Attributes ----------------------------------------------------
- final long recordID;
-
long transactionID;
+ long recordID;
+ boolean committed = false;
- final Map<SimpleString, AtomicInteger> destinations = new HashMap<SimpleString, AtomicInteger>();
+ final AtomicInteger numberOfMessages = new AtomicInteger(0);
+
+ /** This is a transient field, not being persisted */
+ final ConcurrentHashSet<SimpleString> destinations = new ConcurrentHashSet<SimpleString>();
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public PageTransactionImpl(final long recordID, final long transactionID)
+ public PageTransactionImpl(final long transactionID)
{
- this.recordID = recordID;
this.transactionID = transactionID;
}
- public PageTransactionImpl(final long recordID)
+ public PageTransactionImpl()
{
- this.recordID = recordID;
}
-
+
// Public --------------------------------------------------------
@@ -73,109 +73,87 @@
return recordID;
}
+ public void setRecordID(long recordID)
+ {
+ this.recordID = recordID;
+ }
+
public long getTransactionID()
{
return transactionID;
}
- public void setTransactionID(final long transactionID)
+ public int addMessage(SimpleString destination)
{
- this.transactionID = transactionID;
+ this.destinations.add(destination);
+
+ return numberOfMessages.incrementAndGet();
}
- public synchronized void addMessage(final SimpleString destination)
+ public int decrement()
{
- AtomicInteger value = destinations.get(destination);
- if (value == null)
+ final int value = numberOfMessages.decrementAndGet();
+ if (value < 0)
{
- destinations.put(destination, new AtomicInteger(1));
+ throw new IllegalStateException("Internal error Negative value on Paging transactions!");
}
- else
- {
- value.incrementAndGet();
- }
+
+ return value;
}
- public synchronized void decrementMessage(final SimpleString destination, final int numberOfMessages)
+ public int decrement(int elements)
{
- AtomicInteger value = destinations.get(destination);
- if (value == null)
+ final int value = numberOfMessages.addAndGet(elements * -1);
+ if (value < 0)
{
- throw new IllegalStateException("Can't find counter for destination " + destination);
+ throw new IllegalStateException("Internal error Negative value on Paging transactions!");
}
- if (value.addAndGet(numberOfMessages * -1) < 0)
- {
- throw new IllegalStateException("Negative value on destination " + destination + " at PageTransaction");
- }
+
+ return value;
}
- public synchronized int getSize(final SimpleString destination)
+ public int getSize()
{
- AtomicInteger value = destinations.get(destination);
- if (value == null)
- {
- return 0;
- }
- else
- {
- return value.intValue();
- }
+ return numberOfMessages.get();
}
- public synchronized boolean isEmpty()
+ public SimpleString[] getDestinations()
{
- for(Map.Entry<SimpleString, AtomicInteger> element: destinations.entrySet())
- {
- if (element.getValue().intValue() != 0)
- {
- return false;
- }
- }
-
- return true;
+ return destinations.toArray(new SimpleString[destinations.size()]);
}
-
// EncodingSupport implementation
public synchronized void decode(final MessagingBuffer buffer)
{
this.transactionID = buffer.getLong();
- final int numberOfElements = buffer.getInt();
- destinations.clear();
-
- for (int i = 0; i < numberOfElements; i++)
- {
- SimpleString str = buffer.getSimpleString();
- AtomicInteger numberOfMessages = new AtomicInteger(buffer.getInt());
- destinations.put(str, numberOfMessages);
- }
+ this.recordID = buffer.getLong();
+ this.numberOfMessages.set(buffer.getInt());
}
public synchronized void encode(final MessagingBuffer buffer)
{
-
+ this.committed = true; // if it is being readed, certainly it was committed
buffer.putLong(this.transactionID);
- buffer.putInt(destinations.size());
-
- for(Map.Entry<SimpleString, AtomicInteger> element: destinations.entrySet())
- {
- buffer.putSimpleString(element.getKey());
- buffer.putInt(element.getValue().intValue());
- }
+ buffer.putLong(this.recordID);
+ buffer.putInt(this.numberOfMessages.get());
}
public synchronized int getEncodeSize()
{
- int size = 0;
- for(Map.Entry<SimpleString, AtomicInteger> element: destinations.entrySet())
- {
- size += SimpleString.sizeofString(element.getKey());
- }
-
- return size + destinations.size() * 4 + 4 + 8;
+ return 8*2 /* long */ + 4 /* int */;
}
+ public boolean isCommitted()
+ {
+ return committed;
+ }
+
+ public void complete()
+ {
+ committed = true;
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java 2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java 2008-08-22 04:06:45 UTC (rev 4861)
@@ -37,7 +37,7 @@
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
*/
-public class PagingManagerFactoryNIO implements PagingStoreFactory<PageMessage>
+public class PagingManagerFactoryNIO implements PagingStoreFactory
{
// Constants -----------------------------------------------------
@@ -59,13 +59,13 @@
// Public --------------------------------------------------------
- public PagingStore<PageMessage> newStore(SimpleString destinationName)
+ public PagingStore newStore(SimpleString destinationName)
{
final String destinationDirectory = directory + "/" + destinationName.toString();
File destinationFile = new File(destinationDirectory);
destinationFile.mkdirs();
- return new PagingStoreImpl<PageMessage>(newFileFactory(destinationDirectory), this, destinationName, pageSize);
+ return new PagingStoreImpl(newFileFactory(destinationDirectory), this, destinationName, pageSize);
}
public PageMessage[] newArray(int size)
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2008-08-22 04:06:45 UTC (rev 4861)
@@ -26,7 +26,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import org.jboss.messaging.core.journal.EncodingSupport;
import org.jboss.messaging.core.paging.PagingManager;
import org.jboss.messaging.core.paging.PagingStore;
import org.jboss.messaging.core.paging.PagingStoreFactory;
@@ -37,7 +36,7 @@
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
*/
-public class PagingManagerImpl<T extends EncodingSupport> implements PagingManager<T>
+public class PagingManagerImpl implements PagingManager
{
// Constants -----------------------------------------------------
@@ -45,32 +44,32 @@
private volatile boolean started = false;
- private final ConcurrentMap<SimpleString, PagingStore<T>> stores = new ConcurrentHashMap<SimpleString, PagingStore<T>>();
+ private final ConcurrentMap<SimpleString, PagingStore> stores = new ConcurrentHashMap<SimpleString, PagingStore>();
- private final PagingStoreFactory<T> pagingSPI;
+ private final PagingStoreFactory pagingSPI;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public PagingManagerImpl(final PagingStoreFactory<T> pagingSPI)
+ public PagingManagerImpl(final PagingStoreFactory pagingSPI)
{
this.pagingSPI = pagingSPI;
}
// Public --------------------------------------------------------
- public PagingStore<T> getPageStore(final SimpleString storeName) throws Exception
+ public PagingStore getPageStore(final SimpleString storeName) throws Exception
{
validateStarted();
- PagingStore<T> store = stores.get(storeName);
+ PagingStore store = stores.get(storeName);
if (store == null)
{
store = newStore(storeName);
- PagingStore<T> oldStore = stores.putIfAbsent(storeName, store);
+ PagingStore oldStore = stores.putIfAbsent(storeName, store);
if (oldStore != null)
{
@@ -99,7 +98,7 @@
{
this.started = false;
- for (PagingStore<T> store: stores.values())
+ for (PagingStore store: stores.values())
{
store.stop();
}
@@ -112,7 +111,7 @@
// Private -------------------------------------------------------
- private PagingStore<T> newStore(final SimpleString destinationName)
+ private PagingStore newStore(final SimpleString destinationName)
{
return pagingSPI.newStore(destinationName);
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2008-08-22 04:06:45 UTC (rev 4861)
@@ -34,7 +34,7 @@
import org.jboss.messaging.core.journal.SequentialFile;
import org.jboss.messaging.core.journal.SequentialFileFactory;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.paging.DepageListener;
+import org.jboss.messaging.core.paging.Pager;
import org.jboss.messaging.core.paging.Page;
import org.jboss.messaging.core.paging.PagingStoreFactory;
import org.jboss.messaging.util.SimpleString;
@@ -44,7 +44,7 @@
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
*/
-public class PagingStoreImpl<T extends EncodingSupport> implements TestSupportPageStore<T>
+public class PagingStoreImpl implements TestSupportPageStore
{
// Constants -----------------------------------------------------
@@ -58,7 +58,7 @@
private final SimpleString storeName;
- private final PagingStoreFactory<T> storeFactory;
+ private final PagingStoreFactory storeFactory;
private final SequentialFileFactory fileFactory;
@@ -70,7 +70,7 @@
private volatile int numberOfPages;
private volatile int firstPageId = Integer.MAX_VALUE;
private volatile int currentPageId;
- private volatile Page<T> currentPage;
+ private volatile Page currentPage;
// This is supposed to perform better than synchronized methods
private final Semaphore globalLock = new Semaphore(1);
@@ -84,7 +84,7 @@
// Constructors --------------------------------------------------
- public PagingStoreImpl(final SequentialFileFactory fileFactory, PagingStoreFactory<T> storeFactory, final SimpleString storeName, final long maxPageSize)
+ public PagingStoreImpl(final SequentialFileFactory fileFactory, PagingStoreFactory storeFactory, final SimpleString storeName, final long maxPageSize)
{
this.storeFactory = storeFactory;
this.fileFactory = fileFactory;
@@ -121,10 +121,12 @@
return storeName;
}
- public Page<T> depage() throws Exception
+ /** It returns one of the Pages. It doesn't perform reading by itself. */
+ public Page depage() throws Exception
{
validateInit();
+ // Read needs both global and writeLock
globalLock.acquire();
lock.writeLock().lock();
@@ -140,7 +142,7 @@
numberOfPages--;
- final Page<T> returnPage;
+ final Page returnPage;
if (currentPageId == firstPageId)
{
firstPageId = Integer.MAX_VALUE;
@@ -188,7 +190,7 @@
}
- public boolean page(T message) throws Exception
+ public boolean page(PageMessage message) throws Exception
{
validateInit();
@@ -266,7 +268,7 @@
}
}
- public boolean startDequeueThread(final DepageListener<T> listener) throws Exception
+ public boolean startDequeueThread(final Pager listener) throws Exception
{
if (!isPaging())
{
@@ -453,7 +455,7 @@
}
- private Page<T> createPage(int page) throws Exception
+ private Page createPage(int page) throws Exception
{
String fileName = createFileName(page);
SequentialFile file = fileFactory.createSequentialFile(fileName, 1000);
@@ -471,7 +473,7 @@
file.close();
- return new PageImpl<T>(fileFactory, file, storeFactory, page);
+ return new PageImpl(fileFactory, file, storeFactory, page);
}
/**
@@ -503,9 +505,9 @@
class DequeueThread extends Thread
{
- final DepageListener<T> listener;
+ final Pager listener;
- public DequeueThread(final DepageListener<T> listener)
+ public DequeueThread(final Pager listener)
{
this.listener = listener;
}
@@ -518,14 +520,14 @@
boolean needMorePages = false;
do
{
- Page<T> page = depage();
+ Page page = depage();
if (page == null)
{
break;
}
page.open();
- T messages[] = page.read();
- listener.onDepage(PagingStoreImpl.this.storeName, messages);
+ PageMessage messages[] = page.read();
+ needMorePages = listener.onDepage(PagingStoreImpl.this.storeName, messages);
page.delete();
}
while (needMorePages);
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/TestSupportPageStore.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/TestSupportPageStore.java 2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/TestSupportPageStore.java 2008-08-22 04:06:45 UTC (rev 4861)
@@ -25,14 +25,12 @@
import org.jboss.messaging.core.paging.PagingStore;
-import org.jboss.messaging.core.journal.EncodingSupport;
-
/**
* All the methods required to TestCases on PageStoreImpl
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
*/
-public interface TestSupportPageStore<T extends EncodingSupport> extends PagingStore<T>
+public interface TestSupportPageStore extends PagingStore
{
void forceAnotherPage() throws Exception;
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/StorageManager.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/StorageManager.java 2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/StorageManager.java 2008-08-22 04:06:45 UTC (rev 4861)
@@ -25,6 +25,7 @@
import java.util.List;
import java.util.Map;
+import org.jboss.messaging.core.paging.PageTransaction;
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.server.MessageReference;
@@ -72,6 +73,12 @@
void rollback(long txID) throws Exception;
+ void storePageTransaction(long txID, PageTransaction pageTransaction) throws Exception;
+
+ void updatePageTransaction(long txID, PageTransaction pageTransaction) throws Exception;
+
+
+
void updateDeliveryCount(MessageReference ref) throws Exception;
void loadMessages(PostOffice postOffice, Map<Long, Queue> queues) throws Exception;
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-08-22 04:06:45 UTC (rev 4861)
@@ -47,6 +47,7 @@
import org.jboss.messaging.core.journal.impl.JournalImpl;
import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.paging.PageTransaction;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.PostOffice;
@@ -96,6 +97,8 @@
public static final byte UPDATE_DELIVERY_COUNT = 33;
+ public static final byte PAGE_TRANSACTION = 34;
+
public static final byte SET_SCHEDULED_DELIVERY_TIME = 44;
private final AtomicLong messageIDSequence = new AtomicLong(0);
@@ -216,7 +219,17 @@
{
messageJournal.appendAddRecordTransactional(txID, message.getMessageID(), ADD_MESSAGE, message);
}
-
+
+ public void storePageTransaction(long txID, PageTransaction pageTransaction) throws Exception
+ {
+ messageJournal.appendAddRecordTransactional(txID, pageTransaction.getRecordID(), PAGE_TRANSACTION, pageTransaction);
+ }
+
+ public void updatePageTransaction(long txID, PageTransaction pageTransaction) throws Exception
+ {
+ messageJournal.appendUpdateRecordTransactional(txID, pageTransaction.getRecordID(), PAGE_TRANSACTION, pageTransaction);
+ }
+
public void storeAcknowledgeTransactional(long txID, long queueID, long messageID) throws Exception
{
EncodingSupport record = ackBytes(queueID, messageID);
@@ -245,6 +258,7 @@
}
// Other operations
+
public void updateDeliveryCount(final MessageReference ref) throws Exception
{
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java 2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java 2008-08-22 04:06:45 UTC (rev 4861)
@@ -26,6 +26,7 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
+import org.jboss.messaging.core.paging.PageTransaction;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.PostOffice;
@@ -117,7 +118,15 @@
{
}
- public void updateDeliveryCount(MessageReference ref) throws Exception
+ public void storePageTransaction(long txID, PageTransaction pageTransaction) throws Exception
+ {
+ }
+
+ public void updatePageTransaction(long txID, PageTransaction pageTransaction) throws Exception
+ {
+ }
+
+ public void updateDeliveryCount(MessageReference ref) throws Exception
{
}
@@ -155,5 +164,5 @@
{
return started;
}
-
+
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/PostOffice.java 2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/PostOffice.java 2008-08-22 04:06:45 UTC (rev 4861)
@@ -27,6 +27,8 @@
import java.util.Set;
import org.jboss.messaging.core.filter.Filter;
+import org.jboss.messaging.core.paging.Pager;
+import org.jboss.messaging.core.paging.impl.PageMessage;
import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.MessagingComponent;
import org.jboss.messaging.core.server.ServerMessage;
@@ -80,33 +82,8 @@
Map<SimpleString, List<Binding>> getMappings();
Set<SimpleString> listAllDestinations();
-
- void routeAndDeliver(final ServerMessage msg) throws Exception;
+ Pager getPager();
- /**
- * To be used by transactions only.
- * If you're sure you will page if isPaging, just call the method page and look at its return.
- * @param destination
- * @return
- */
- boolean isPaging(SimpleString destination) throws Exception;
- /**
- * Page, only if destination is in page mode.
- * @param message
- * @return false if destination is not on page mode
- */
- boolean page(ServerMessage message) throws Exception;
-
- /**
- *
- * To be called when there are no more references to the message
- * @param message
- */
- void messageDone(ServerMessage message) throws Exception;
-
- /** To be called when a rollback is called after messageDone was called */
- long addSize(ServerMessage message) throws Exception;
-
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2008-08-22 04:06:45 UTC (rev 4861)
@@ -25,6 +25,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -37,7 +38,8 @@
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.paging.DepageListener;
+import org.jboss.messaging.core.paging.Pager;
+import org.jboss.messaging.core.paging.PageTransaction;
import org.jboss.messaging.core.paging.PagingManager;
import org.jboss.messaging.core.paging.PagingStore;
import org.jboss.messaging.core.paging.impl.PageMessage;
@@ -63,7 +65,7 @@
public class PostOfficeImpl implements PostOffice
{
- private static final long MAX_SIZE = 100 * 1024 * 1024;
+ private static final long MAX_SIZE = 1000000;
private static final Logger log = Logger.getLogger(PostOfficeImpl.class);
@@ -87,12 +89,14 @@
private final StorageManager storageManager;
- private final PagingManager<PageMessage> pagingManager;
+ private final PagingManager pagingManager;
+ private final Pager pager = new PagerImpl();
+
private volatile boolean started;
public PostOfficeImpl(final StorageManager storageManager,
- final PagingManager<PageMessage> pagingManager,
+ final PagingManager pagingManager,
final QueueFactory queueFactory, final boolean checkAllowable)
{
this.storageManager = storageManager;
@@ -225,25 +229,12 @@
return nameMap.get(queueName);
}
- public void routeAndDeliver(final ServerMessage message) throws Exception
+ public List<MessageReference> route(final ServerMessage message) throws Exception
{
- List<MessageReference> refs = this.route(message);
- if (message.getDurableRefCount() != 0)
+ if (pager.addSize(message) > MAX_SIZE)
{
- storageManager.storeMessage(message);
- }
-
- for (MessageReference ref : refs)
- {
- ref.getQueue().addLast(ref);
- }
- }
-
- public List<MessageReference> route(final ServerMessage message) throws Exception
- {
- if (addSize(message.getDestination(), message.getEncodeSize()) > MAX_SIZE)
- {
+ // TODO: move this inside the Pager
PagingStore store = pagingManager.getPageStore(message.getDestination());
if (store.startPaging())
@@ -307,35 +298,11 @@
// }
// }
-
- public boolean isPaging(SimpleString destination) throws Exception
+ public Pager getPager()
{
- return pagingManager.getPageStore(destination).isPaging();
+ return this.pager;
}
-
- public void messageDone(ServerMessage message) throws Exception
- {
- final long size = addSize(message.getDestination(), message.getEncodeSize() * -1);
-
- if (size < MAX_SIZE)
- {
- PagingStore<PageMessage> store = pagingManager.getPageStore(message.getDestination());
-
- startDepageThread(store);
- }
- }
-
- /** To be called when a rollback is called after messageDone was called */
- public long addSize(ServerMessage message) throws Exception
- {
- return addSize(message.getDestination(), message.getEncodeSize());
- }
-
- public boolean page(ServerMessage message) throws Exception
- {
- return pagingManager.getPageStore(message.getDestination()).page(new PageMessage(message));
- }
public Map<SimpleString, List<Binding>> getMappings()
{
@@ -355,12 +322,6 @@
// Private -----------------------------------------------------------------
- private long addSize(SimpleString destination, long size)
- {
- totalSize.addAndGet(size);
- return getQueueSize(destination).addAndGet(size);
- }
-
private AtomicLong getQueueSize(SimpleString destination)
{
AtomicLong size = this.queueSize.get(destination);
@@ -477,28 +438,169 @@
for (SimpleString destination: dests)
{
- PagingStore<PageMessage> store = pagingManager.getPageStore(destination);
+ PagingStore store = pagingManager.getPageStore(destination);
startDepageThread(store);
}
}
- private void startDepageThread(PagingStore<PageMessage> store) throws Exception
+ private void startDepageThread(PagingStore store) throws Exception
{
- store.startDequeueThread(new PagingListener());
+ store.startDequeueThread(new PagerImpl());
}
- private class PagingListener implements DepageListener<PageMessage>
+ // TODO this probably will become a separate class?
+ private class PagerImpl implements Pager
{
+
+ private final ConcurrentMap</*TransactionID*/ Long , PageTransaction> transactions = new ConcurrentHashMap<Long, PageTransaction>();
+ /**
+ * This method will remove files from the page system and add them into the journal, doing it transactionally
+ *
+ * A Transaction will be opened only if persistent messages are used.
+ * If persistent messages are also used, it will update eventual PageTransactions
+ */
public boolean onDepage(final SimpleString destination, final PageMessage[] data) throws Exception
{
+ log.info("Depaging....");
+ long transactionID = storageManager.generateTransactionID();
+ boolean usedTransaction = false;
+
+ HashSet<PageTransaction> pageTransactionsToUpdate = new HashSet<PageTransaction>();
+
+ final List<MessageReference> refsToAdd = new ArrayList<MessageReference>();
+
for (PageMessage msg: data)
{
- routeAndDeliver(msg.getMessage());
+ PageTransaction trUsed = null;
+ if (msg.getTransactionID() > 0)
+ {
+ trUsed = transactions.get(msg.getTransactionID());
+ if (trUsed == null)
+ {
+ // TODO make it .trace
+ log.info("Transaction " + msg.getTransactionID() + " not found, ignoring message " + msg.getMessage().getMessageID());
+ continue;
+ }
+ else if (!trUsed.isCommitted())
+ {
+ log.info("Transaction " + msg.getTransactionID() + " is pending... we don't know what to do yet... ignoring the message for now but this is not acceptable");
+ continue;
+ }
+ }
+
+ if (msg.getMessage().isDurable() && trUsed != null)
+ {
+ pageTransactionsToUpdate.add(trUsed);
+ trUsed.decrement();
+ }
+
+ refsToAdd.addAll(PostOfficeImpl.this.route(msg.getMessage()));
+
+ if (msg.getMessage().getDurableRefCount() != 0)
+ {
+ usedTransaction = true;
+ storageManager.storeMessageTransactional(transactionID, msg.getMessage());
+ }
}
+
+ for (PageTransaction pageTrans: pageTransactionsToUpdate)
+ {
+ if (pageTrans.getSize() == 0)
+ {
+ storageManager.storeDelete(pageTrans.getRecordID());
+ }
+ else
+ {
+ storageManager.updatePageTransaction(transactionID, pageTrans);
+ }
+ usedTransaction = true;
+ }
+
+ if (usedTransaction)
+ {
+ storageManager.commit(transactionID);
+ }
+
+ for (MessageReference ref : refsToAdd)
+ {
+ ref.getQueue().addLast(ref);
+ }
+
+
return PostOfficeImpl.this.getQueueSize(destination).get() < MAX_SIZE;
}
+
+ // Transaction
+ public void beginTransaction(long transactionID, PageTransaction pageTransaction)
+ {
+ transactions.putIfAbsent(transactionID, pageTransaction);
+ }
+
+ public void commitTransaction(long transactionID, PageTransaction pageTransaction)
+ {
+ transactions.putIfAbsent(transactionID, pageTransaction);
+ // TODO: What to do with pending transactions during depage?
+ }
+
+ public void rollbackTransaction(long transactionID)
+ {
+ transactions.remove(transactionID);
+ }
+
+ public boolean isPaging(SimpleString destination) throws Exception
+ {
+ return pagingManager.getPageStore(destination).isPaging();
+ }
+
+ public void messageDone(ServerMessage message) throws Exception
+ {
+ final long size = addSize(message.getDestination(), message.getEncodeSize() * -1);
+
+ if (size < MAX_SIZE)
+ {
+ System.out.println("Starting depage Thread, size = " + size);
+ PagingStore store = pagingManager.getPageStore(message.getDestination());
+
+ startDepageThread(store);
+ }
+ }
+
+ private long addSize(SimpleString destination, long size)
+ {
+ totalSize.addAndGet(size);
+ return getQueueSize(destination).addAndGet(size);
+ }
+
+ /** To be called when a rollback is called after messageDone was called */
+ public long addSize(ServerMessage message) throws Exception
+ {
+ return addSize(message.getDestination(), message.getEncodeSize());
+ }
+
+ public boolean page(ServerMessage message) throws Exception
+ {
+ return pagingManager.getPageStore(message.getDestination()).page(new PageMessage(message));
+ }
+
+ public boolean page(ServerMessage message, long transactionID) throws Exception
+ {
+ return pagingManager.getPageStore(message.getDestination()).page(new PageMessage(message, transactionID));
+ }
+
+
+ public void sync(PageTransaction pageTransaction) throws Exception
+ {
+ SimpleString[] destinations = pageTransaction.getDestinations();
+ for (SimpleString destination: destinations)
+ {
+ pagingManager.getPageStore(destination).sync();
+ }
+ }
+
+
+
}
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-08-22 04:06:45 UTC (rev 4861)
@@ -172,7 +172,7 @@
scheduledExecutor = new ScheduledThreadPoolExecutor(configuration.getScheduledThreadPoolMaxSize(), new JBMThreadFactory("JBM-scheduled-threads"));
queueFactory = new QueueFactoryImpl(scheduledExecutor, queueSettingsRepository);
- PagingManager<PageMessage> pagingManager = new PagingManagerImpl<PageMessage>(new PagingManagerFactoryNIO(configuration.getPagingDirectory(), configuration.getPageFileSize()));
+ PagingManager pagingManager = new PagingManagerImpl(new PagingManagerFactoryNIO(configuration.getPagingDirectory(), configuration.getPageFileSize()));
postOffice = new PostOfficeImpl(storageManager, pagingManager,
queueFactory, configuration.isRequireDestinations());
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-08-22 04:06:45 UTC (rev 4861)
@@ -39,6 +39,8 @@
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.filter.impl.FilterImpl;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.paging.Pager;
+import org.jboss.messaging.core.paging.impl.PageMessage;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.FlowController;
@@ -128,6 +130,8 @@
private final ResourceManager resourceManager;
private final PostOffice postOffice;
+
+ private final Pager pager;
private final SecurityStore securityStore;
@@ -161,6 +165,8 @@
this.postOffice = postOffice;
+ this.pager = postOffice.getPager();
+
this.queueSettingsRepository = queueSettingsRepository;
this.resourceManager = resourceManager;
@@ -306,7 +312,7 @@
if (autoCommitSends)
{
- if (!postOffice.page(msg))
+ if (!pager.page(msg))
{
List<MessageReference> refs = postOffice.route(msg);
@@ -1143,8 +1149,12 @@
if (message.decrementRefCount() == 0)
{
- postOffice.messageDone(message);
+ pager.messageDone(message);
}
+ else
+ {
+ System.out.println("Still " + message.getRefCount());
+ }
if (message.isDurable() && queue.isDurable())
{
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-08-22 04:06:45 UTC (rev 4861)
@@ -22,8 +22,18 @@
package org.jboss.messaging.core.transaction.impl;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import javax.transaction.xa.Xid;
+
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.paging.Pager;
+import org.jboss.messaging.core.paging.PageTransaction;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.server.MessageReference;
@@ -32,11 +42,9 @@
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.core.transaction.Transaction;
-import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.core.paging.impl.PageMessage;
+import org.jboss.messaging.core.paging.impl.PageTransactionImpl;
-import javax.transaction.xa.Xid;
-import java.util.*;
-
/**
* A TransactionImpl
*
@@ -46,14 +54,19 @@
{
private static final Logger log = Logger.getLogger(TransactionImpl.class);
+
private final StorageManager storageManager;
private final PostOffice postOffice;
+
+ private final Pager pager;
private final List<MessageReference> refsToAdd = new ArrayList<MessageReference>();
private final List<MessageReference> acknowledgements = new ArrayList<MessageReference>();
+ private PageTransaction pageTransaction;
+
private final Xid xid;
private final long id;
@@ -70,6 +83,15 @@
this.storageManager = storageManager;
this.postOffice = postOffice;
+
+ if (postOffice == null)
+ {
+ pager = null;
+ }
+ else
+ {
+ this.pager = postOffice.getPager();
+ }
this.xid = null;
@@ -82,6 +104,8 @@
this.storageManager = storageManager;
this.postOffice = postOffice;
+
+ this.pager = postOffice.getPager();
this.xid = xid;
@@ -102,16 +126,31 @@
{
throw new IllegalStateException("Transaction is in invalid state " + state);
}
+
+ if (pager.page(message, this.id))
+ {
+ if (message.isDurable())
+ {
+ containsPersistent = true;
+ getPageTransaction().addMessage(message.getDestination());
+ }
+ else
+ {
+ getPageTransaction();
+ }
+ }
+ else
+ {
+ List<MessageReference> refs = postOffice.route(message);
- List<MessageReference> refs = postOffice.route(message);
+ refsToAdd.addAll(refs);
- refsToAdd.addAll(refs);
+ if (message.getDurableRefCount() != 0)
+ {
+ storageManager.storeMessageTransactional(id, message);
- if (message.getDurableRefCount() != 0)
- {
- storageManager.storeMessageTransactional(id, message);
-
- containsPersistent = true;
+ containsPersistent = true;
+ }
}
}
@@ -129,9 +168,9 @@
if (message.decrementRefCount() == 0)
{
- if (postOffice != null)
+ if (pager != null)
{
- postOffice.messageDone(message);
+ pager.messageDone(message);
}
}
@@ -183,6 +222,10 @@
if (containsPersistent)
{
+ if (this.pageTransaction != null)
+ {
+ storageManager.storePageTransaction(this.id, pageTransaction);
+ }
storageManager.prepare(id);
}
@@ -218,11 +261,25 @@
}
}
+
if (containsPersistent)
{
+ if (this.pageTransaction != null && state != State.PREPARED)
+ {
+ storageManager.storePageTransaction(this.id, pageTransaction);
+ pager.sync(pageTransaction);
+ }
storageManager.commit(id);
}
+
+ // TODO: What to do if depage happen on the middle of transaction not committed yet?
+ // This would be a problem on any solution applied on transactions & paging
+ if (pageTransaction != null)
+ {
+ pager.commitTransaction(id, pageTransaction);
+ }
+
for (MessageReference ref : refsToAdd)
{
ref.getQueue().addLast(ref);
@@ -259,6 +316,11 @@
{
storageManager.rollback(id);
}
+
+ if (pageTransaction != null)
+ {
+ pager.rollbackTransaction(id);
+ }
Map<Queue, LinkedList<MessageReference>> queueMap = new HashMap<Queue, LinkedList<MessageReference>>();
@@ -281,7 +343,7 @@
// Putting back the size to control paging
if (message.incrementRefCount() == 1)
{
- postOffice.addSize(message);
+ pager.addSize(message);
}
message.incrementRefCount();
@@ -372,6 +434,19 @@
// Private
// -------------------------------------------------------------------
+ private PageTransaction getPageTransaction()
+ {
+ if (pageTransaction == null)
+ {
+ long pageTRID = storageManager.generateMessageID();
+ pageTransaction = new PageTransactionImpl(id);
+ pageTransaction.setRecordID(pageTRID);
+ pager.beginTransaction(id, pageTransaction);
+ }
+
+ return pageTransaction;
+ }
+
private void clear()
{
refsToAdd.clear();
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java 2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java 2008-08-22 04:06:45 UTC (rev 4861)
@@ -60,11 +60,11 @@
public void testPagingManagerNIO() throws Exception
{
- PagingManagerImpl<PageMessage> managerImpl =
- new PagingManagerImpl<PageMessage>(new PagingManagerFactoryNIO(journalDir, 1024*1024));
+ PagingManagerImpl managerImpl =
+ new PagingManagerImpl(new PagingManagerFactoryNIO(journalDir, 1024*1024));
managerImpl.start();
- PagingStore<PageMessage> store = managerImpl.getPageStore(new SimpleString("simple-test"));
+ PagingStore store = managerImpl.getPageStore(new SimpleString("simple-test"));
ServerMessage msg = createMessage(1l, new SimpleString("simple-test"), createRandomBuffer(10));
@@ -74,7 +74,7 @@
assertTrue(store.page(new PageMessage(msg)));
- Page<PageMessage> page = store.depage();
+ Page page = store.depage();
page.open();
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java 2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java 2008-08-22 04:06:45 UTC (rev 4861)
@@ -28,6 +28,7 @@
import java.util.concurrent.ConcurrentHashMap;
import org.jboss.messaging.core.filter.Filter;
+import org.jboss.messaging.core.paging.Pager;
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.FlowController;
import org.jboss.messaging.core.postoffice.PostOffice;
@@ -161,6 +162,19 @@
{
throw new IllegalStateException("Not implemented!");
}
+
+ public boolean page(ServerMessage message, long transactionID)
+ throws Exception
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ public Pager getPager()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java 2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java 2008-08-22 04:06:45 UTC (rev 4861)
@@ -66,7 +66,7 @@
SequentialFile file = factory.createSequentialFile("00010.page", 1);
- PageImpl<PageMessage> impl = new PageImpl<PageMessage>(factory, file, new FakeManagerFactory(1024), 10);
+ PageImpl impl = new PageImpl(factory, file, new FakeManagerFactory(1024), 10);
assertEquals(10, impl.getPageId());
@@ -106,7 +106,7 @@
file = factory.createSequentialFile("00010.page", 1);
file.open();
- impl = new PageImpl<PageMessage>(factory, file, new FakeManagerFactory(1024), 10);
+ impl = new PageImpl(factory, file, new FakeManagerFactory(1024), 10);
PageMessage msgs[] = impl.read();
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java 2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java 2008-08-22 04:06:45 UTC (rev 4861)
@@ -32,7 +32,6 @@
import org.jboss.messaging.core.paging.PagingStoreFactory;
import org.jboss.messaging.core.paging.impl.PagingManagerImpl;
import org.jboss.messaging.core.paging.impl.PagingStoreImpl;
-import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.tests.util.UnitTestCase;
import org.jboss.messaging.util.SimpleString;
@@ -105,7 +104,7 @@
public void testMultipleThreadsGetStore() throws Exception
{
- PagingStoreFactory<ServerMessage> spi = EasyMock.createMock(PagingStoreFactory.class);
+ PagingStoreFactory spi = EasyMock.createMock(PagingStoreFactory.class);
final PagingManagerImpl manager = new PagingManagerImpl(spi);
final SimpleString destination = new SimpleString("some-destination");
@@ -114,7 +113,7 @@
EasyMock.expect(factory.listFiles(EasyMock.isA(String.class))).andStubReturn(new ArrayList<String>());
- PagingStoreImpl<ServerMessage> storeImpl = new PagingStoreImpl<ServerMessage>(factory, spi, destination, 1);
+ PagingStoreImpl storeImpl = new PagingStoreImpl(factory, spi, destination, 1);
EasyMock.expect(spi.newStore(destination)).andStubReturn(storeImpl);
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageTransactionImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageTransactionImplTest.java 2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageTransactionImplTest.java 2008-08-22 04:06:45 UTC (rev 4861)
@@ -55,26 +55,23 @@
{
long id1 = RandomUtil.randomLong();
long id2 = RandomUtil.randomLong();
- PageTransaction trans = new PageTransactionImpl(id1, id2);
+ PageTransaction trans = new PageTransactionImpl(id2);
- SimpleString dest1 = RandomUtil.randomSimpleString();
- SimpleString dest2 = RandomUtil.randomSimpleString();
+ trans.setRecordID(id1);
+
+ // anything between 2 and 100
+ int nr1 = RandomUtil.randomPositiveInt()%98 + 2;
- int nr1 = RandomUtil.randomPositiveInt()%100;
- int nr2 = RandomUtil.randomPositiveInt()%100;
-
+ SimpleString str1 = RandomUtil.randomSimpleString();
+ SimpleString str2 = RandomUtil.randomSimpleString();
+
for (int i = 0; i < nr1; i++)
{
- trans.addMessage(dest1);
+ trans.addMessage(i%2 == 0? str1: str2);
}
- for (int i = 0; i < nr2; i++)
- {
- trans.addMessage(dest2);
- }
- assertEquals(nr1, trans.getSize(dest1));
- assertEquals(nr2, trans.getSize(dest2));
+ assertEquals(nr1, trans.getSize());
ByteBuffer buffer = ByteBuffer.allocate(trans.getEncodeSize());
MessagingBuffer wrapper = new ByteBufferWrapper(buffer);
@@ -88,28 +85,24 @@
assertEquals(id1, trans2.getRecordID());
assertEquals(id2, trans2.getTransactionID());
- assertEquals(nr1, trans2.getSize(dest1));
- assertEquals(nr2, trans2.getSize(dest2));
+ assertEquals(nr1, trans2.getSize());
- trans.decrementMessage(dest1, nr1);
- trans2.decrementMessage(dest1, nr1);
+ trans.decrement(nr1);
- trans.decrementMessage(dest2, nr2);
- trans2.decrementMessage(dest2, nr2);
+ assertEquals(0, trans.getSize());
- assertTrue(trans.isEmpty());
- assertTrue(trans.isEmpty());
-
-
try
{
- trans.decrementMessage(dest1, 1000);
+ trans.decrement();
fail("Exception expected!");
}
catch (Throwable ignored)
{
}
+
+ assertEquals(2, trans.getDestinations().length);
+
}
// Package protected ---------------------------------------------
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java 2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java 2008-08-22 04:06:45 UTC (rev 4861)
@@ -33,7 +33,6 @@
import org.jboss.messaging.core.paging.impl.PageMessage;
import org.jboss.messaging.core.paging.impl.PagingStoreImpl;
import org.jboss.messaging.core.paging.impl.TestSupportPageStore;
-import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
import org.jboss.messaging.tests.unit.core.paging.fakes.FakeManagerFactory;
import org.jboss.messaging.util.SimpleString;
@@ -95,7 +94,7 @@
buffers.add(buffer);
SimpleString destination = new SimpleString("test");
- ServerMessage msg = createMessage(1l, destination, buffer);
+ PageMessage msg = createMessage(1l, destination, buffer);
assertTrue(storeImpl.isPaging());
@@ -117,7 +116,7 @@
{
SequentialFileFactory factory = new FakeSequentialFileFactory();
- PagingStore<PageMessage> storeImpl = new PagingStoreImpl<PageMessage>(factory, new FakeManagerFactory(0), destinationTestName, 1024 * 10);
+ PagingStore storeImpl = new PagingStoreImpl(factory, new FakeManagerFactory(0), destinationTestName, 1024 * 10);
storeImpl.start();
@@ -136,9 +135,9 @@
buffers.add(buffer);
- ServerMessage msg = createMessage(i+1l, destination, buffer);
+ PageMessage msg = createMessage(i+1l, destination, buffer);
- assertTrue(storeImpl.page(new PageMessage(msg, 0l)));
+ assertTrue(storeImpl.page(msg));
}
@@ -146,7 +145,7 @@
storeImpl.sync();
- Page<PageMessage> page = storeImpl.depage();
+ Page page = storeImpl.depage();
page.open();
@@ -173,7 +172,7 @@
{
SequentialFileFactory factory = new FakeSequentialFileFactory();
- TestSupportPageStore<PageMessage> storeImpl = new PagingStoreImpl<PageMessage>(factory, new FakeManagerFactory(0), destinationTestName, 1024 * 10);
+ TestSupportPageStore storeImpl = new PagingStoreImpl(factory, new FakeManagerFactory(0), destinationTestName, 1024 * 10);
storeImpl.start();
@@ -200,9 +199,9 @@
}
- ServerMessage msg = createMessage(i+1l, destination, buffer);
+ PageMessage msg = createMessage(i+1l, destination, buffer);
- assertTrue(storeImpl.page(new PageMessage(msg)));
+ assertTrue(storeImpl.page(msg));
}
@@ -212,7 +211,7 @@
for (int pageNr = 0; pageNr < 2; pageNr++)
{
- Page<PageMessage> page = storeImpl.depage();
+ Page page = storeImpl.depage();
page.open();
@@ -233,11 +232,11 @@
assertTrue(storeImpl.isPaging());
- ServerMessage msg = createMessage(100, destination, buffers.get(0));
+ PageMessage msg = createMessage(100, destination, buffers.get(0));
- assertTrue(storeImpl.page(new PageMessage(msg)));
+ assertTrue(storeImpl.page(msg));
- Page<PageMessage> newPage = storeImpl.depage();
+ Page newPage = storeImpl.depage();
newPage.open();
@@ -253,13 +252,13 @@
assertFalse(storeImpl.isPaging());
- assertFalse(storeImpl.page(new PageMessage(msg)));
+ assertFalse(storeImpl.page(msg));
storeImpl.startPaging();
- assertTrue(storeImpl.page(new PageMessage(msg)));
+ assertTrue(storeImpl.page(msg));
- Page<PageMessage> page = storeImpl.depage();
+ Page page = storeImpl.depage();
page.open();
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java 2008-08-21 21:42:55 UTC (rev 4860)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java 2008-08-22 04:06:45 UTC (rev 4861)
@@ -80,9 +80,9 @@
final ConcurrentHashMap<Long, PageMessage> buffers = new ConcurrentHashMap<Long, PageMessage>();
- final ArrayList<Page<PageMessage>> readPages = new ArrayList<Page<PageMessage>>();
+ final ArrayList<Page> readPages = new ArrayList<Page>();
- final TestSupportPageStore<PageMessage> storeImpl = new PagingStoreImpl<PageMessage>(factory, new FakeManagerFactory(0), new SimpleString("test"), MAX_SIZE);
+ final TestSupportPageStore storeImpl = new PagingStoreImpl(factory, new FakeManagerFactory(0), new SimpleString("test"), MAX_SIZE);
storeImpl.start();
@@ -111,7 +111,7 @@
while (true)
{
long id = messageIdGenerator.incrementAndGet();
- PageMessage msg = new PageMessage(createMessage(id, destination, createRandomBuffer(5)));
+ PageMessage msg = createMessage(id, destination, createRandomBuffer(5));
if (storeImpl.page(msg))
{
buffers.put(id, msg);
@@ -198,7 +198,7 @@
final ConcurrentHashMap<Long, PageMessage> buffers2 = new ConcurrentHashMap<Long, PageMessage>();
- for (Page<PageMessage> page: readPages)
+ for (Page page: readPages)
{
page.open();
PageMessage msgs[] = page.read();
@@ -228,7 +228,7 @@
fileTmp.close();
}
- TestSupportPageStore<PageMessage> storeImpl2 = new PagingStoreImpl<PageMessage>(factory, new FakeManagerFactory(0), new SimpleString("test"), MAX_SIZE);
+ TestSupportPageStore storeImpl2 = new PagingStoreImpl(factory, new FakeManagerFactory(0), new SimpleString("test"), MAX_SIZE);
storeImpl2.start();
int numberOfPages = storeImpl2.getNumberOfPages();
@@ -244,15 +244,15 @@
assertEquals(numberOfPages, storeImpl2.getNumberOfPages());
long lastMessageId = messageIdGenerator.incrementAndGet();
- PageMessage lastMsg = new PageMessage(createMessage(lastMessageId, destination, createRandomBuffer(5)));
+ PageMessage lastMsg = createMessage(lastMessageId, destination, createRandomBuffer(5));
storeImpl2.page(lastMsg);
buffers2.put(lastMessageId, lastMsg);
- Page<PageMessage> lastPage = null;
+ Page lastPage = null;
while (true)
{
- Page<PageMessage> page = storeImpl2.depage();
+ Page page = storeImpl2.depage();
if (page == null)
{
break;
@@ -290,7 +290,7 @@
}
- protected ServerMessage createMessage(long messageId, SimpleString destination, ByteBuffer buffer)
+ protected PageMessage createMessage(long messageId, SimpleString destination, ByteBuffer buffer)
{
ServerMessage msg = new ServerMessageImpl((byte)1, true, 0,
System.currentTimeMillis(), (byte)0, new ByteBufferWrapper(buffer));
@@ -298,7 +298,7 @@
msg.setMessageID((long)messageId);
msg.setDestination(destination);
- return msg;
+ return new PageMessage(msg);
}
protected ByteBuffer createRandomBuffer(int size)
More information about the jboss-cvs-commits
mailing list