[jboss-cvs] JBoss Messaging SVN: r4860 - in branches/Branch_JBMESSAGING-1314: src/main/org/jboss/messaging/core/journal/impl and 9 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Aug 21 17:42:55 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-08-21 17:42:55 -0400 (Thu, 21 Aug 2008)
New Revision: 4860
Added:
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/DepageListener.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageMessage.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/fakes/
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/fakes/FakeManagerFactory.java
Modified:
branches/Branch_JBMESSAGING-1314/native/bin/libJBMLibAIO64.so
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Page.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingManager.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/TestSupportPageStore.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingStoreIntegrationTest.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
Log:
Making the journal less dependent on Messages
Modified: branches/Branch_JBMESSAGING-1314/native/bin/libJBMLibAIO64.so
===================================================================
(Binary files differ)
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-08-21 14:55:00 UTC (rev 4859)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-08-21 21:42:55 UTC (rev 4860)
@@ -1964,6 +1964,7 @@
if (System.currentTimeMillis() - lastTime > 10000)
{
System.out.println("Clear!!!" + reuseBuffers.size());
+ lastTime = System.currentTimeMillis();
reuseBuffers.clear();
}
Added: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/DepageListener.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/DepageListener.java (rev 0)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/DepageListener.java 2008-08-21 21:42:55 UTC (rev 4860)
@@ -0,0 +1,55 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.paging;
+
+import org.jboss.messaging.core.journal.EncodingSupport;
+import org.jboss.messaging.util.SimpleString;
+
+public interface DepageListener<T extends EncodingSupport>
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ /**
+ * @return false if the listener can't handle more pages
+ */
+ boolean onDepage(SimpleString destination, T[] data) throws Exception;
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Page.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Page.java 2008-08-21 14:55:00 UTC (rev 4859)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Page.java 2008-08-21 21:42:55 UTC (rev 4860)
@@ -23,21 +23,25 @@
package org.jboss.messaging.core.paging;
-import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.journal.EncodingSupport;
/**
*
+ * Page could be used to enqueue files for any Datatype implementing EncodingSupport
+ *
+ * Paging was written this way as it would be easier to refactor Paging to other datatypes we could require later (such as ACKs, transactions... etc).
+ *
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
*/
-public interface Page
+public interface Page<T extends EncodingSupport>
{
int getPageId();
- void write(ServerMessage message) throws Exception;
+ void write(T message) throws Exception;
- ServerMessage[] read() throws Exception;
+ T[] read() throws Exception;
int getSize();
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingManager.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingManager.java 2008-08-21 14:55:00 UTC (rev 4859)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingManager.java 2008-08-21 21:42:55 UTC (rev 4860)
@@ -22,6 +22,7 @@
package org.jboss.messaging.core.paging;
+import org.jboss.messaging.core.journal.EncodingSupport;
import org.jboss.messaging.core.server.MessagingComponent;
import org.jboss.messaging.util.SimpleString;
@@ -32,7 +33,7 @@
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
*/
-public interface PagingManager extends MessagingComponent
+public interface PagingManager<T extends EncodingSupport> extends MessagingComponent
{
- public PagingStore getPageStore(SimpleString storeName) throws Exception;
+ public PagingStore<T> getPageStore(SimpleString storeName) throws Exception;
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java 2008-08-21 14:55:00 UTC (rev 4859)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java 2008-08-21 21:42:55 UTC (rev 4860)
@@ -25,8 +25,8 @@
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.server.MessagingComponent;
-import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.core.journal.EncodingSupport;
/**
*
@@ -37,7 +37,7 @@
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
*/
-public interface PagingStore extends MessagingComponent
+public interface PagingStore<T extends EncodingSupport> extends MessagingComponent
{
int getNumberOfPages();
@@ -51,7 +51,7 @@
void sync() throws Exception;
- boolean page(ServerMessage message) throws Exception;
+ boolean page(T message) throws Exception;
/**
* Remove the first page from the Writing Queue.
@@ -60,7 +60,7 @@
* @return
* @throws Exception
*/
- Page depage() throws Exception;
+ Page<T> depage() throws Exception;
/**
*
@@ -68,6 +68,6 @@
* @return false if a thread was already started, or if not in page mode
* @throws Exception
*/
- boolean startDequeueThread(PostOffice postOffice, long maxSize) throws Exception;
+ boolean startDequeueThread(DepageListener<T> listener) throws Exception;
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java 2008-08-21 14:55:00 UTC (rev 4859)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java 2008-08-21 21:42:55 UTC (rev 4860)
@@ -23,16 +23,20 @@
package org.jboss.messaging.core.paging;
+import org.jboss.messaging.core.journal.EncodingSupport;
+
/**
* The integration point between the PagingManger and the File System (aka SequentialFiles)
*
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
*/
-public interface PagingStoreFactory
+public interface PagingStoreFactory<T extends EncodingSupport>
{
- PagingStore newStore(org.jboss.messaging.util.SimpleString destinationName);
+ PagingStore<T> newStore(org.jboss.messaging.util.SimpleString destinationName);
+ T newElement();
+ T[] newArray(int size);
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java 2008-08-21 14:55:00 UTC (rev 4859)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java 2008-08-21 21:42:55 UTC (rev 4860)
@@ -27,13 +27,13 @@
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;
+import org.jboss.messaging.core.journal.EncodingSupport;
import org.jboss.messaging.core.journal.IOCallback;
import org.jboss.messaging.core.journal.SequentialFile;
import org.jboss.messaging.core.journal.SequentialFileFactory;
import org.jboss.messaging.core.paging.Page;
+import org.jboss.messaging.core.paging.PagingStoreFactory;
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
-import org.jboss.messaging.core.server.ServerMessage;
-import org.jboss.messaging.core.server.impl.ServerMessageImpl;
import org.jboss.messaging.util.VariableLatch;
/**
@@ -41,7 +41,7 @@
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
*/
-public class PageImpl implements Page
+public class PageImpl<T extends EncodingSupport> implements Page<T>
{
// Constants -----------------------------------------------------
@@ -52,7 +52,7 @@
private static final int SIZE_BYTE = 1;
- public static final int SIZE_RECORD = SIZE_LONG + SIZE_INTEGER + SIZE_BYTE + SIZE_BYTE;
+ public static final int SIZE_RECORD = SIZE_BYTE + SIZE_INTEGER + SIZE_BYTE;
public static final byte START_BYTE= (byte)'{';
public static final byte END_BYTE= (byte)'}';
@@ -60,6 +60,7 @@
// Attributes ----------------------------------------------------
private final int pageId;
+ private final PagingStoreFactory<T> storeFactory;
private final AtomicInteger numberOfMessages = new AtomicInteger(0);
private final SequentialFile file;
private final SequentialFileFactory fileFactory;
@@ -70,11 +71,12 @@
// Constructors --------------------------------------------------
- public PageImpl(final SequentialFileFactory factory, final SequentialFile file, int pageId) throws Exception
+ public PageImpl(final SequentialFileFactory factory, final SequentialFile file, PagingStoreFactory<T> storeFactory, final int pageId) throws Exception
{
this.pageId = pageId;
this.file = file;
this.fileFactory = factory;
+ this.storeFactory = storeFactory;
if (factory.isSupportsCallbacks())
{
callback = new PagingCallback();
@@ -97,10 +99,10 @@
return pageId;
}
- public ServerMessage[] read() throws Exception
+ public T[] read() throws Exception
{
- ArrayList<ServerMessage> messages = new ArrayList<ServerMessage>();
+ ArrayList<T> messages = new ArrayList<T>();
ByteBuffer buffer = fileFactory.newBuffer((int)file.size());
file.position(0);
@@ -122,8 +124,7 @@
int oldPos = buffer.position();
if (buffer.position() + messageSize < buffer.limit() && buffer.get(oldPos + messageSize) == END_BYTE)
{
- ServerMessage msg = instantiateObject();
- msg.setMessageID(buffer.getLong());
+ T msg = instantiateObject();
msg.decode(messageBuffer);
messages.add(msg);
}
@@ -144,12 +145,11 @@
return messages.toArray(instantiateArray(messages.size()));
}
- public void write(final ServerMessage message) throws Exception
+ public void write(final T message) throws Exception
{
ByteBuffer buffer = fileFactory.newBuffer(message.getEncodeSize() + SIZE_RECORD);
buffer.put(START_BYTE);
- buffer.putInt(message.getEncodeSize() + SIZE_LONG);
- buffer.putLong(message.getMessageID());
+ buffer.putInt(message.getEncodeSize());
message.encode(new ByteBufferWrapper(buffer));
buffer.put(END_BYTE);
buffer.rewind();
@@ -212,14 +212,15 @@
// Protected -----------------------------------------------------
- protected ServerMessage instantiateObject()
+ protected T instantiateObject()
{
- return new ServerMessageImpl();
+ return storeFactory.newElement();
}
+
- protected ServerMessage[] instantiateArray(int size)
+ protected T[] instantiateArray(final int size)
{
- return new ServerMessageImpl[size];
+ return storeFactory.newArray(size);
}
// Private -------------------------------------------------------
Added: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageMessage.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageMessage.java (rev 0)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageMessage.java 2008-08-21 21:42:55 UTC (rev 4860)
@@ -0,0 +1,111 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.paging.impl;
+
+import org.jboss.messaging.core.journal.EncodingSupport;
+import org.jboss.messaging.core.remoting.MessagingBuffer;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.impl.ServerMessageImpl;
+
+/**
+ *
+ * This class is used to encapsulate ServerMessage and TransactionID on Paging
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public class PageMessage implements EncodingSupport
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ private final ServerMessage message;
+ private long transactionID;
+
+ public PageMessage(ServerMessage message, long transactionID)
+ {
+ this.message = message;
+ }
+
+ public PageMessage(ServerMessage message)
+ {
+ this.message = message;
+ }
+
+ public PageMessage()
+ {
+ this(new ServerMessageImpl());
+ }
+
+ public ServerMessage getMessage()
+ {
+ return message;
+ }
+
+ public long getTransactionID()
+ {
+ return transactionID;
+ }
+
+
+ // EncodingSupport implementation --------------------------------
+
+ public void decode(MessagingBuffer buffer)
+ {
+ transactionID = buffer.getLong();
+ final long messageID = buffer.getLong();
+ message.decode(buffer);
+ message.setMessageID(messageID);
+ }
+
+ public void encode(MessagingBuffer buffer)
+ {
+ buffer.putLong(transactionID);
+ buffer.putLong(message.getMessageID());
+ message.encode(buffer);
+ }
+
+ public int getEncodeSize()
+ {
+
+ return 8 + 8 + message.getEncodeSize();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java 2008-08-21 14:55:00 UTC (rev 4859)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java 2008-08-21 21:42:55 UTC (rev 4860)
@@ -25,6 +25,7 @@
import java.io.File;
+import org.jboss.messaging.core.journal.SequentialFileFactory;
import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
import org.jboss.messaging.core.paging.PagingStore;
import org.jboss.messaging.core.paging.PagingStoreFactory;
@@ -36,7 +37,7 @@
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
*/
-public class PagingManagerFactoryNIO implements PagingStoreFactory
+public class PagingManagerFactoryNIO implements PagingStoreFactory<PageMessage>
{
// Constants -----------------------------------------------------
@@ -58,19 +59,34 @@
// Public --------------------------------------------------------
- public PagingStore newStore(SimpleString destinationName)
+ public PagingStore<PageMessage> newStore(SimpleString destinationName)
{
final String destinationDirectory = directory + "/" + destinationName.toString();
File destinationFile = new File(destinationDirectory);
destinationFile.mkdirs();
- return new PagingStoreImpl(new NIOSequentialFileFactory(destinationDirectory), destinationName, pageSize);
+ return new PagingStoreImpl<PageMessage>(newFileFactory(destinationDirectory), this, destinationName, pageSize);
}
+
+ public PageMessage[] newArray(int size)
+ {
+ return new PageMessage[size];
+ }
+
+ public PageMessage newElement()
+ {
+ return new PageMessage();
+ }
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
+ protected SequentialFileFactory newFileFactory(final String destinationDirectory)
+ {
+ return new NIOSequentialFileFactory(destinationDirectory);
+ }
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2008-08-21 14:55:00 UTC (rev 4859)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2008-08-21 21:42:55 UTC (rev 4860)
@@ -26,6 +26,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import org.jboss.messaging.core.journal.EncodingSupport;
import org.jboss.messaging.core.paging.PagingManager;
import org.jboss.messaging.core.paging.PagingStore;
import org.jboss.messaging.core.paging.PagingStoreFactory;
@@ -36,7 +37,7 @@
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
*/
-public class PagingManagerImpl implements PagingManager
+public class PagingManagerImpl<T extends EncodingSupport> implements PagingManager<T>
{
// Constants -----------------------------------------------------
@@ -44,32 +45,32 @@
private volatile boolean started = false;
- private final ConcurrentMap<SimpleString, PagingStore> stores = new ConcurrentHashMap<SimpleString, PagingStore>();
+ private final ConcurrentMap<SimpleString, PagingStore<T>> stores = new ConcurrentHashMap<SimpleString, PagingStore<T>>();
- private final PagingStoreFactory pagingSPI;
+ private final PagingStoreFactory<T> pagingSPI;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public PagingManagerImpl(final PagingStoreFactory pagingSPI)
+ public PagingManagerImpl(final PagingStoreFactory<T> pagingSPI)
{
this.pagingSPI = pagingSPI;
}
// Public --------------------------------------------------------
- public PagingStore getPageStore(final SimpleString storeName) throws Exception
+ public PagingStore<T> getPageStore(final SimpleString storeName) throws Exception
{
validateStarted();
- PagingStore store = stores.get(storeName);
+ PagingStore<T> store = stores.get(storeName);
if (store == null)
{
store = newStore(storeName);
- PagingStore oldStore = stores.putIfAbsent(storeName, store);
+ PagingStore<T> oldStore = stores.putIfAbsent(storeName, store);
if (oldStore != null)
{
@@ -98,7 +99,7 @@
{
this.started = false;
- for (PagingStore store: stores.values())
+ for (PagingStore<T> store: stores.values())
{
store.stop();
}
@@ -111,7 +112,7 @@
// Private -------------------------------------------------------
- private PagingStore newStore(final SimpleString destinationName)
+ private PagingStore<T> newStore(final SimpleString destinationName)
{
return pagingSPI.newStore(destinationName);
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2008-08-21 14:55:00 UTC (rev 4859)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2008-08-21 21:42:55 UTC (rev 4860)
@@ -30,13 +30,13 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.jboss.messaging.core.journal.EncodingSupport;
import org.jboss.messaging.core.journal.SequentialFile;
import org.jboss.messaging.core.journal.SequentialFileFactory;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.paging.DepageListener;
import org.jboss.messaging.core.paging.Page;
-import org.jboss.messaging.core.paging.PagingStore;
-import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.paging.PagingStoreFactory;
import org.jboss.messaging.util.SimpleString;
/**
@@ -44,7 +44,7 @@
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
*/
-public class PagingStoreImpl implements PagingStore, TestSupportPageStore
+public class PagingStoreImpl<T extends EncodingSupport> implements TestSupportPageStore<T>
{
// Constants -----------------------------------------------------
@@ -58,16 +58,19 @@
private final SimpleString storeName;
- private final SequentialFileFactory factory;
+ private final PagingStoreFactory<T> storeFactory;
+ private final SequentialFileFactory fileFactory;
+
private final long maxPageSize;
private volatile Thread dequeueThread;
+
private volatile int numberOfPages;
private volatile int firstPageId = Integer.MAX_VALUE;
private volatile int currentPageId;
- private volatile Page currentPage;
+ private volatile Page<T> currentPage;
// This is supposed to perform better than synchronized methods
private final Semaphore globalLock = new Semaphore(1);
@@ -81,9 +84,10 @@
// Constructors --------------------------------------------------
- public PagingStoreImpl(final SequentialFileFactory factory, final SimpleString storeName, final long maxPageSize)
+ public PagingStoreImpl(final SequentialFileFactory fileFactory, PagingStoreFactory<T> storeFactory, final SimpleString storeName, final long maxPageSize)
{
- this.factory = factory;
+ this.storeFactory = storeFactory;
+ this.fileFactory = fileFactory;
this.storeName = storeName;
this.maxPageSize = maxPageSize;
}
@@ -117,7 +121,7 @@
return storeName;
}
- public Page depage() throws Exception
+ public Page<T> depage() throws Exception
{
validateInit();
@@ -136,7 +140,7 @@
numberOfPages--;
- final Page returnPage;
+ final Page<T> returnPage;
if (currentPageId == firstPageId)
{
firstPageId = Integer.MAX_VALUE;
@@ -184,11 +188,11 @@
}
- public boolean page(ServerMessage message) throws Exception
+ public boolean page(T message) throws Exception
{
validateInit();
- int bytesToWrite = factory.calculateBlockSize(message.getEncodeSize() + PageImpl.SIZE_RECORD);
+ int bytesToWrite = fileFactory.calculateBlockSize(message.getEncodeSize() + PageImpl.SIZE_RECORD);
// This would be a synchronized block... (but using a Semaphore)
@@ -262,7 +266,7 @@
}
}
- public boolean startDequeueThread(final PostOffice postOffice, final long maxSize) throws Exception
+ public boolean startDequeueThread(final DepageListener<T> listener) throws Exception
{
if (!isPaging())
{
@@ -274,7 +278,7 @@
{
if (this.dequeueThread == null)
{
- this.dequeueThread = new DequeueThread(postOffice, maxSize);
+ this.dequeueThread = new DequeueThread(listener);
this.dequeueThread.start();
return true;
}
@@ -336,7 +340,7 @@
try
{
- List<String> files = factory.listFiles("page");
+ List<String> files = fileFactory.listFiles("page");
numberOfPages = files.size();
@@ -449,16 +453,16 @@
}
- private Page createPage(int page) throws Exception
+ private Page<T> createPage(int page) throws Exception
{
String fileName = createFileName(page);
- SequentialFile file = factory.createSequentialFile(fileName, 1000);
+ SequentialFile file = fileFactory.createSequentialFile(fileName, 1000);
file.open();
long size = file.size();
- if (factory.isSupportsCallbacks() && size < maxPageSize)
+ if (fileFactory.isSupportsCallbacks() && size < maxPageSize)
{
file.fill((int)size, (int)maxPageSize - (int)size, (byte)0);
}
@@ -467,7 +471,7 @@
file.close();
- return new PageImpl(factory, file, page);
+ return new PageImpl<T>(fileFactory, file, storeFactory, page);
}
/**
@@ -499,13 +503,11 @@
class DequeueThread extends Thread
{
- final PostOffice postOffice;
- final long maxSize;
+ final DepageListener<T> listener;
- public DequeueThread(final PostOffice postOffice, final long maxSize)
+ public DequeueThread(final DepageListener<T> listener)
{
- this.postOffice = postOffice;
- this.maxSize = maxSize;
+ this.listener = listener;
}
@@ -513,21 +515,20 @@
{
try
{
- while (postOffice.getSize(storeName) < maxSize)
+ boolean needMorePages = false;
+ do
{
- Page page = depage();
+ Page<T> page = depage();
if (page == null)
{
break;
}
page.open();
- ServerMessage messages[] = page.read();
- for (ServerMessage message: messages)
- {
- postOffice.routeAndDeliver(message);
- }
+ T messages[] = page.read();
+ listener.onDepage(PagingStoreImpl.this.storeName, messages);
page.delete();
}
+ while (needMorePages);
}
catch (Exception e)
{
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/TestSupportPageStore.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/TestSupportPageStore.java 2008-08-21 14:55:00 UTC (rev 4859)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/TestSupportPageStore.java 2008-08-21 21:42:55 UTC (rev 4860)
@@ -25,12 +25,14 @@
import org.jboss.messaging.core.paging.PagingStore;
+import org.jboss.messaging.core.journal.EncodingSupport;
+
/**
* All the methods required to TestCases on PageStoreImpl
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
*/
-public interface TestSupportPageStore extends PagingStore
+public interface TestSupportPageStore<T extends EncodingSupport> extends PagingStore<T>
{
void forceAnotherPage() throws Exception;
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/PostOffice.java 2008-08-21 14:55:00 UTC (rev 4859)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/PostOffice.java 2008-08-21 21:42:55 UTC (rev 4860)
@@ -109,6 +109,4 @@
/** To be called when a rollback is called after messageDone was called */
long addSize(ServerMessage message) throws Exception;
- long getSize(SimpleString destination) throws Exception;
-
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2008-08-21 14:55:00 UTC (rev 4859)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2008-08-21 21:42:55 UTC (rev 4860)
@@ -37,8 +37,10 @@
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.paging.DepageListener;
import org.jboss.messaging.core.paging.PagingManager;
import org.jboss.messaging.core.paging.PagingStore;
+import org.jboss.messaging.core.paging.impl.PageMessage;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.FlowController;
@@ -85,12 +87,12 @@
private final StorageManager storageManager;
- private final PagingManager pagingManager;
+ private final PagingManager<PageMessage> pagingManager;
private volatile boolean started;
public PostOfficeImpl(final StorageManager storageManager,
- final PagingManager pagingManager,
+ final PagingManager<PageMessage> pagingManager,
final QueueFactory queueFactory, final boolean checkAllowable)
{
this.storageManager = storageManager;
@@ -317,12 +319,9 @@
if (size < MAX_SIZE)
{
- PagingStore manager = pagingManager.getPageStore(message.getDestination());
- if (manager.startDequeueThread(this, MAX_SIZE))
- {
- log.info("Starting dequeing page for " + message.getDestination());
- }
+ PagingStore<PageMessage> store = pagingManager.getPageStore(message.getDestination());
+ startDepageThread(store);
}
}
@@ -335,7 +334,7 @@
public boolean page(ServerMessage message) throws Exception
{
- return pagingManager.getPageStore(message.getDestination()).page(message);
+ return pagingManager.getPageStore(message.getDestination()).page(new PageMessage(message));
}
public Map<SimpleString, List<Binding>> getMappings()
@@ -478,8 +477,28 @@
for (SimpleString destination: dests)
{
- PagingStore store = pagingManager.getPageStore(destination);
- store.startDequeueThread(this, MAX_SIZE);
+ PagingStore<PageMessage> store = pagingManager.getPageStore(destination);
+ startDepageThread(store);
}
}
+
+ private void startDepageThread(PagingStore<PageMessage> store) throws Exception
+ {
+ store.startDequeueThread(new PagingListener());
+ }
+
+
+ private class PagingListener implements DepageListener<PageMessage>
+ {
+
+ public boolean onDepage(final SimpleString destination, final PageMessage[] data) throws Exception
+ {
+ for (PageMessage msg: data)
+ {
+ routeAndDeliver(msg.getMessage());
+ }
+ return PostOfficeImpl.this.getQueueSize(destination).get() < MAX_SIZE;
+ }
+
+ }
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-08-21 14:55:00 UTC (rev 4859)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-08-21 21:42:55 UTC (rev 4860)
@@ -35,6 +35,7 @@
import org.jboss.messaging.core.management.impl.MessagingServerManagementImpl;
import org.jboss.messaging.core.paging.PagingManager;
import org.jboss.messaging.core.paging.PagingStoreFactory;
+import org.jboss.messaging.core.paging.impl.PageMessage;
import org.jboss.messaging.core.paging.impl.PagingManagerFactoryNIO;
import org.jboss.messaging.core.paging.impl.PagingManagerImpl;
import org.jboss.messaging.core.persistence.StorageManager;
@@ -171,7 +172,7 @@
scheduledExecutor = new ScheduledThreadPoolExecutor(configuration.getScheduledThreadPoolMaxSize(), new JBMThreadFactory("JBM-scheduled-threads"));
queueFactory = new QueueFactoryImpl(scheduledExecutor, queueSettingsRepository);
- PagingManager pagingManager = new PagingManagerImpl(new PagingManagerFactoryNIO(configuration.getPagingDirectory(), configuration.getPageFileSize()));
+ PagingManager<PageMessage> pagingManager = new PagingManagerImpl<PageMessage>(new PagingManagerFactoryNIO(configuration.getPagingDirectory(), configuration.getPageFileSize()));
postOffice = new PostOfficeImpl(storageManager, pagingManager,
queueFactory, configuration.isRequireDestinations());
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java 2008-08-21 14:55:00 UTC (rev 4859)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java 2008-08-21 21:42:55 UTC (rev 4860)
@@ -28,8 +28,9 @@
import org.jboss.messaging.core.paging.Page;
import org.jboss.messaging.core.paging.PagingStore;
+import org.jboss.messaging.core.paging.impl.PageMessage;
+import org.jboss.messaging.core.paging.impl.PagingManagerFactoryNIO;
import org.jboss.messaging.core.paging.impl.PagingManagerImpl;
-import org.jboss.messaging.core.paging.impl.PagingManagerFactoryNIO;
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.impl.ServerMessageImpl;
@@ -59,37 +60,37 @@
public void testPagingManagerNIO() throws Exception
{
- PagingManagerImpl managerImpl =
- new PagingManagerImpl(new PagingManagerFactoryNIO(journalDir, 1024*1024));
+ PagingManagerImpl<PageMessage> managerImpl =
+ new PagingManagerImpl<PageMessage>(new PagingManagerFactoryNIO(journalDir, 1024*1024));
managerImpl.start();
- PagingStore store = managerImpl.getPageStore(new SimpleString("simple-test"));
+ PagingStore<PageMessage> store = managerImpl.getPageStore(new SimpleString("simple-test"));
ServerMessage msg = createMessage(1l, new SimpleString("simple-test"), createRandomBuffer(10));
- assertFalse(store.page(msg));
+ assertFalse(store.page(new PageMessage(msg)));
store.startPaging();
- assertTrue(store.page(msg));
+ assertTrue(store.page(new PageMessage(msg)));
- Page page = store.depage();
+ Page<PageMessage> page = store.depage();
page.open();
- ServerMessage msgs[] = page.read();
+ PageMessage msgs[] = page.read();
page.close();
assertEquals(1, msgs.length);
- assertEqualsByteArrays(msg.getBody().array(), msgs[0].getBody().array());
+ assertEqualsByteArrays(msg.getBody().array(), msgs[0].getMessage().getBody().array());
assertTrue(store.isPaging());
assertNull(store.depage());
- assertFalse(store.page(msg));
+ assertFalse(store.page(new PageMessage(msg)));
}
// Package protected ---------------------------------------------
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingStoreIntegrationTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingStoreIntegrationTest.java 2008-08-21 14:55:00 UTC (rev 4859)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingStoreIntegrationTest.java 2008-08-21 21:42:55 UTC (rev 4860)
@@ -63,7 +63,7 @@
{
recreateDirectory();
System.out.println("Test " + i);
- testConcurrentPaging(new NIOSequentialFileFactory(journalDir), 10);
+ testConcurrentPaging(new NIOSequentialFileFactory(journalDir), 1);
}
}
Added: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/fakes/FakeManagerFactory.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/fakes/FakeManagerFactory.java (rev 0)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/fakes/FakeManagerFactory.java 2008-08-21 21:42:55 UTC (rev 4860)
@@ -0,0 +1,63 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.unit.core.paging.fakes;
+
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.paging.impl.PagingManagerFactoryNIO;
+import org.jboss.messaging.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
+
+public class FakeManagerFactory extends PagingManagerFactoryNIO
+{
+
+ public FakeManagerFactory(long pageSize)
+ {
+ super("", pageSize);
+ }
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+
+ @Override
+ protected SequentialFileFactory newFileFactory(String destinationDirectory)
+ {
+ return new FakeSequentialFileFactory();
+ }
+
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java 2008-08-21 14:55:00 UTC (rev 4859)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java 2008-08-21 21:42:55 UTC (rev 4860)
@@ -23,16 +23,6 @@
package org.jboss.messaging.tests.unit.core.paging.impl;
-import java.nio.ByteBuffer;
-
-import org.easymock.EasyMock;
-import org.easymock.IAnswer;
-import org.jboss.messaging.core.journal.IOCallback;
-import org.jboss.messaging.core.journal.SequentialFile;
-import org.jboss.messaging.core.journal.SequentialFileFactory;
-import org.jboss.messaging.core.paging.impl.PageImpl;
-import org.jboss.messaging.core.remoting.MessagingBuffer;
-import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
@@ -66,81 +56,7 @@
- public void testEasyMockPageWithCallback() throws Exception
- {
- testEasyMockOnPage(true);
- }
-
- public void testEasyMockPageWithoutCallback() throws Exception
- {
- testEasyMockOnPage(false);
- }
-
- private void testEasyMockOnPage(boolean callback) throws Exception
- {
- SequentialFileFactory factory = EasyMock.createMock(SequentialFileFactory.class);
-
- EasyMock.expect(factory.isSupportsCallbacks()).andStubReturn(callback);
-
- SequentialFile file = EasyMock.createStrictMock(SequentialFile.class);
-
- EasyMock.replay(factory, file);
-
- PageImpl impl = new PageImpl(factory, file, 1);
-
- EasyMock.verify(factory, file);
-
- EasyMock.reset(factory, file);
-
-
- EasyMock.expect(factory.newBuffer(EasyMock.anyInt())).andStubAnswer(new IAnswer<ByteBuffer>() {
-
- public ByteBuffer answer() throws Throwable
- {
- int size = (Integer)EasyMock.getCurrentArguments()[0];
- return ByteBuffer.allocate(size);
- }});
-
- ServerMessage msg = EasyMock.createMock(ServerMessage.class);
- EasyMock.expect(msg.getMessageID()).andStubReturn(1l);
- EasyMock.expect(msg.getEncodeSize()).andStubReturn(2);
- msg.encode(EasyMock.isA(MessagingBuffer.class));
- EasyMock.expectLastCall().andAnswer(new IAnswer<Object>(){
-
- public Object answer() throws Throwable
- {
- MessagingBuffer buffer = (MessagingBuffer)EasyMock.getCurrentArguments()[0];
- buffer.putByte((byte)5);
- buffer.putByte((byte)6);
- return null;
- }});
-
- final byte [] expectedBytes = autoEncode((byte)'{', (int)10, (long)1, (byte)5, (byte)6, (byte)'}');
-
- if (callback)
- {
- EasyMock.expect(file.write(compareByteBuffer(expectedBytes), EasyMock.isA(IOCallback.class))).andAnswer(new IAnswer<Integer>(){
-
- public Integer answer() throws Throwable
- {
- IOCallback callback = (IOCallback)EasyMock.getCurrentArguments()[1];
- callback.done();
- return expectedBytes.length;
- }});
- }
- else
- {
- EasyMock.expect(file.write(compareByteBuffer(expectedBytes), EasyMock.eq(false))).andReturn(expectedBytes.length);
- }
-
- EasyMock.replay(factory, file, msg);
-
- impl.write(msg);
-
- EasyMock.verify(factory, file, msg);
-
- }
-
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java 2008-08-21 14:55:00 UTC (rev 4859)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java 2008-08-21 21:42:55 UTC (rev 4860)
@@ -29,9 +29,11 @@
import org.jboss.messaging.core.journal.SequentialFile;
import org.jboss.messaging.core.journal.SequentialFileFactory;
import org.jboss.messaging.core.paging.impl.PageImpl;
+import org.jboss.messaging.core.paging.impl.PageMessage;
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.impl.ServerMessageImpl;
+import org.jboss.messaging.tests.unit.core.paging.fakes.FakeManagerFactory;
import org.jboss.messaging.tests.util.RandomUtil;
import org.jboss.messaging.tests.util.UnitTestCase;
import org.jboss.messaging.util.SimpleString;
@@ -64,7 +66,7 @@
SequentialFile file = factory.createSequentialFile("00010.page", 1);
- PageImpl impl = new PageImpl(factory, file, 10);
+ PageImpl<PageMessage> impl = new PageImpl<PageMessage>(factory, file, new FakeManagerFactory(1024), 10);
assertEquals(10, impl.getPageId());
@@ -94,7 +96,7 @@
msg.setDestination(simpleDestination);
- impl.write(msg);
+ impl.write(new PageMessage(msg));
assertEquals(i + 1, impl.getNumberOfMessages());
}
@@ -104,9 +106,9 @@
file = factory.createSequentialFile("00010.page", 1);
file.open();
- impl = new PageImpl(factory, file, 10);
+ impl = new PageImpl<PageMessage>(factory, file, new FakeManagerFactory(1024), 10);
- ServerMessage msgs[] = impl.read();
+ PageMessage msgs[] = impl.read();
assertEquals(numberOfElements, msgs.length);
@@ -114,11 +116,11 @@
for (int i = 0; i < msgs.length; i++)
{
- assertEquals((long)i, msgs[i].getMessageID());
+ assertEquals((long)i, msgs[i].getMessage().getMessageID());
- assertEquals(simpleDestination, msgs[i].getDestination());
+ assertEquals(simpleDestination, msgs[i].getMessage().getDestination());
- assertEqualsByteArrays(buffers.get(i).array(), msgs[i].getBody().array());
+ assertEqualsByteArrays(buffers.get(i).array(), msgs[i].getMessage().getBody().array());
}
impl.delete();
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java 2008-08-21 14:55:00 UTC (rev 4859)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java 2008-08-21 21:42:55 UTC (rev 4860)
@@ -32,6 +32,7 @@
import org.jboss.messaging.core.paging.PagingStoreFactory;
import org.jboss.messaging.core.paging.impl.PagingManagerImpl;
import org.jboss.messaging.core.paging.impl.PagingStoreImpl;
+import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.tests.util.UnitTestCase;
import org.jboss.messaging.util.SimpleString;
@@ -104,7 +105,7 @@
public void testMultipleThreadsGetStore() throws Exception
{
- PagingStoreFactory spi = EasyMock.createMock(PagingStoreFactory.class);
+ PagingStoreFactory<ServerMessage> spi = EasyMock.createMock(PagingStoreFactory.class);
final PagingManagerImpl manager = new PagingManagerImpl(spi);
final SimpleString destination = new SimpleString("some-destination");
@@ -113,7 +114,7 @@
EasyMock.expect(factory.listFiles(EasyMock.isA(String.class))).andStubReturn(new ArrayList<String>());
- PagingStoreImpl storeImpl = new PagingStoreImpl(factory, destination, 1);
+ PagingStoreImpl<ServerMessage> storeImpl = new PagingStoreImpl<ServerMessage>(factory, spi, destination, 1);
EasyMock.expect(spi.newStore(destination)).andStubReturn(storeImpl);
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java 2008-08-21 14:55:00 UTC (rev 4859)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java 2008-08-21 21:42:55 UTC (rev 4860)
@@ -30,10 +30,12 @@
import org.jboss.messaging.core.journal.SequentialFileFactory;
import org.jboss.messaging.core.paging.Page;
import org.jboss.messaging.core.paging.PagingStore;
+import org.jboss.messaging.core.paging.impl.PageMessage;
import org.jboss.messaging.core.paging.impl.PagingStoreImpl;
import org.jboss.messaging.core.paging.impl.TestSupportPageStore;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
+import org.jboss.messaging.tests.unit.core.paging.fakes.FakeManagerFactory;
import org.jboss.messaging.util.SimpleString;
/**
@@ -60,7 +62,7 @@
{
SequentialFileFactory factory = new FakeSequentialFileFactory();
- PagingStore storeImpl = new PagingStoreImpl(factory, destinationTestName, 1024 * 2);
+ PagingStore storeImpl = new PagingStoreImpl(factory, new FakeManagerFactory(0), destinationTestName, 1024 * 2);
storeImpl.start();
@@ -76,7 +78,7 @@
{
SequentialFileFactory factory = new FakeSequentialFileFactory();
- PagingStore storeImpl = new PagingStoreImpl(factory, destinationTestName, 1024 * 2);
+ PagingStore storeImpl = new PagingStoreImpl(factory, new FakeManagerFactory(0), destinationTestName, 1024 * 2);
storeImpl.start();
@@ -103,7 +105,7 @@
storeImpl.sync();
- storeImpl = new PagingStoreImpl(factory, destinationTestName, 1024 * 2);
+ storeImpl = new PagingStoreImpl(factory, new FakeManagerFactory(0), destinationTestName, 1024 * 2);
storeImpl.start();
@@ -115,7 +117,7 @@
{
SequentialFileFactory factory = new FakeSequentialFileFactory();
- PagingStore storeImpl = new PagingStoreImpl(factory, destinationTestName, 1024 * 10);
+ PagingStore<PageMessage> storeImpl = new PagingStoreImpl<PageMessage>(factory, new FakeManagerFactory(0), destinationTestName, 1024 * 10);
storeImpl.start();
@@ -136,7 +138,7 @@
ServerMessage msg = createMessage(i+1l, destination, buffer);
- assertTrue(storeImpl.page(msg));
+ assertTrue(storeImpl.page(new PageMessage(msg, 0l)));
}
@@ -144,11 +146,11 @@
storeImpl.sync();
- Page page = storeImpl.depage();
+ Page<PageMessage> page = storeImpl.depage();
page.open();
- ServerMessage msg[] = page.read();
+ PageMessage msg[] = page.read();
assertEquals(10, msg.length);
assertEquals(1, storeImpl.getNumberOfPages());
@@ -161,8 +163,8 @@
for (int i = 0; i < 10; i++)
{
- assertEquals(i + 1l, msg[i].getMessageID());
- assertEqualsByteArrays(buffers.get(i).array(), msg[i].getBody().array());
+ assertEquals(i + 1l, msg[i].getMessage().getMessageID());
+ assertEqualsByteArrays(buffers.get(i).array(), msg[i].getMessage().getBody().array());
}
}
@@ -171,7 +173,7 @@
{
SequentialFileFactory factory = new FakeSequentialFileFactory();
- TestSupportPageStore storeImpl = new PagingStoreImpl(factory, destinationTestName, 1024 * 10);
+ TestSupportPageStore<PageMessage> storeImpl = new PagingStoreImpl<PageMessage>(factory, new FakeManagerFactory(0), destinationTestName, 1024 * 10);
storeImpl.start();
@@ -200,7 +202,7 @@
ServerMessage msg = createMessage(i+1l, destination, buffer);
- assertTrue(storeImpl.page(msg));
+ assertTrue(storeImpl.page(new PageMessage(msg)));
}
@@ -210,11 +212,11 @@
for (int pageNr = 0; pageNr < 2; pageNr++)
{
- Page page = storeImpl.depage();
+ Page<PageMessage> page = storeImpl.depage();
page.open();
- ServerMessage msg[] = page.read();
+ PageMessage msg[] = page.read();
page.close();
@@ -222,8 +224,8 @@
for (int i = 0; i < 5; i++)
{
- assertEquals(pageNr*5 + i + 1l, msg[i].getMessageID());
- assertEqualsByteArrays(buffers.get(pageNr*5 + i).array(), msg[i].getBody().array());
+ assertEquals(pageNr*5 + i + 1l, msg[i].getMessage().getMessageID());
+ assertEqualsByteArrays(buffers.get(pageNr*5 + i).array(), msg[i].getMessage().getBody().array());
}
}
@@ -233,9 +235,9 @@
ServerMessage msg = createMessage(100, destination, buffers.get(0));
- assertTrue(storeImpl.page(msg));
+ assertTrue(storeImpl.page(new PageMessage(msg)));
- Page newPage = storeImpl.depage();
+ Page<PageMessage> newPage = storeImpl.depage();
newPage.open();
@@ -251,23 +253,23 @@
assertFalse(storeImpl.isPaging());
- assertFalse(storeImpl.page(msg));
+ assertFalse(storeImpl.page(new PageMessage(msg)));
storeImpl.startPaging();
- assertTrue(storeImpl.page(msg));
+ assertTrue(storeImpl.page(new PageMessage(msg)));
- Page page = storeImpl.depage();
+ Page<PageMessage> page = storeImpl.depage();
page.open();
- ServerMessage msgs[] = page.read();
+ PageMessage msgs[] = page.read();
assertEquals(1, msgs.length);
- assertEquals(100l, msgs[0].getMessageID());
+ assertEquals(100l, msgs[0].getMessage().getMessageID());
- assertEqualsByteArrays(buffers.get(0).array(), msgs[0].getBody().array());
+ assertEqualsByteArrays(buffers.get(0).array(), msgs[0].getMessage().getBody().array());
assertEquals(1, storeImpl.getNumberOfPages());
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java 2008-08-21 14:55:00 UTC (rev 4859)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java 2008-08-21 21:42:55 UTC (rev 4860)
@@ -34,11 +34,13 @@
import org.jboss.messaging.core.journal.SequentialFile;
import org.jboss.messaging.core.journal.SequentialFileFactory;
import org.jboss.messaging.core.paging.Page;
+import org.jboss.messaging.core.paging.impl.PageMessage;
import org.jboss.messaging.core.paging.impl.PagingStoreImpl;
import org.jboss.messaging.core.paging.impl.TestSupportPageStore;
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.impl.ServerMessageImpl;
+import org.jboss.messaging.tests.unit.core.paging.fakes.FakeManagerFactory;
import org.jboss.messaging.tests.util.RandomUtil;
import org.jboss.messaging.tests.util.UnitTestCase;
import org.jboss.messaging.util.SimpleString;
@@ -76,11 +78,11 @@
final CountDownLatch latchStart = new CountDownLatch(numberOfThreads);
- final ConcurrentHashMap<Long, ServerMessage> buffers = new ConcurrentHashMap<Long, ServerMessage>();
+ final ConcurrentHashMap<Long, PageMessage> buffers = new ConcurrentHashMap<Long, PageMessage>();
- final ArrayList<Page> readPages = new ArrayList<Page>();
+ final ArrayList<Page<PageMessage>> readPages = new ArrayList<Page<PageMessage>>();
- final TestSupportPageStore storeImpl = new PagingStoreImpl(factory, new SimpleString("test"), MAX_SIZE);
+ final TestSupportPageStore<PageMessage> storeImpl = new PagingStoreImpl<PageMessage>(factory, new FakeManagerFactory(0), new SimpleString("test"), MAX_SIZE);
storeImpl.start();
@@ -109,7 +111,7 @@
while (true)
{
long id = messageIdGenerator.incrementAndGet();
- ServerMessage msg = createMessage(id, destination, createRandomBuffer(5));
+ PageMessage msg = new PageMessage(createMessage(id, destination, createRandomBuffer(5)));
if (storeImpl.page(msg))
{
buffers.put(id, msg);
@@ -194,21 +196,21 @@
System.out.println("Reading " + buffers.size() + " messages, " + readPages.size() + " pages");
- final ConcurrentHashMap<Long, ServerMessage> buffers2 = new ConcurrentHashMap<Long, ServerMessage>();
+ final ConcurrentHashMap<Long, PageMessage> buffers2 = new ConcurrentHashMap<Long, PageMessage>();
- for (Page page: readPages)
+ for (Page<PageMessage> page: readPages)
{
page.open();
- ServerMessage msgs[] = page.read();
+ PageMessage msgs[] = page.read();
page.close();
- for (ServerMessage msg : msgs)
+ for (PageMessage msg : msgs)
{
- ServerMessage msgWritten = buffers.remove(msg.getMessageID());
- buffers2.put(msg.getMessageID(), msg);
+ PageMessage msgWritten = buffers.remove(msg.getMessage().getMessageID());
+ buffers2.put(msg.getMessage().getMessageID(), msg);
assertNotNull(msgWritten);
- assertEquals (msg.getDestination(), msgWritten.getDestination());
- assertEqualsByteArrays(msgWritten.getBody().array(), msg.getBody().array());
+ assertEquals (msg.getMessage().getDestination(), msgWritten.getMessage().getDestination());
+ assertEqualsByteArrays(msgWritten.getMessage().getBody().array(), msg.getMessage().getBody().array());
}
}
@@ -226,7 +228,7 @@
fileTmp.close();
}
- TestSupportPageStore storeImpl2 = new PagingStoreImpl(factory, new SimpleString("test"), MAX_SIZE);
+ TestSupportPageStore<PageMessage> storeImpl2 = new PagingStoreImpl<PageMessage>(factory, new FakeManagerFactory(0), new SimpleString("test"), MAX_SIZE);
storeImpl2.start();
int numberOfPages = storeImpl2.getNumberOfPages();
@@ -242,15 +244,15 @@
assertEquals(numberOfPages, storeImpl2.getNumberOfPages());
long lastMessageId = messageIdGenerator.incrementAndGet();
- ServerMessage lastMsg = createMessage(lastMessageId, destination, createRandomBuffer(5));
+ PageMessage lastMsg = new PageMessage(createMessage(lastMessageId, destination, createRandomBuffer(5)));
storeImpl2.page(lastMsg);
buffers2.put(lastMessageId, lastMsg);
- Page lastPage = null;
+ Page<PageMessage> lastPage = null;
while (true)
{
- Page page = storeImpl2.depage();
+ Page<PageMessage> page = storeImpl2.depage();
if (page == null)
{
break;
@@ -260,28 +262,28 @@
page.open();
- ServerMessage[] msgs = page.read();
+ PageMessage[] msgs = page.read();
page.close();
- for (ServerMessage msg: msgs)
+ for (PageMessage msg: msgs)
{
- ServerMessage msgWritten = buffers2.remove(msg.getMessageID());
+ PageMessage msgWritten = buffers2.remove(msg.getMessage().getMessageID());
assertNotNull(msgWritten);
- assertEquals (msg.getDestination(), msgWritten.getDestination());
- assertEqualsByteArrays(msgWritten.getBody().array(), msg.getBody().array());
+ assertEquals (msg.getMessage().getDestination(), msgWritten.getMessage().getDestination());
+ assertEqualsByteArrays(msgWritten.getMessage().getBody().array(), msg.getMessage().getBody().array());
}
}
lastPage.open();
- ServerMessage lastMessages[] = lastPage.read();
+ PageMessage lastMessages[] = lastPage.read();
lastPage.close();
assertEquals(1, lastMessages.length);
- assertEquals(lastMessages[0].getMessageID(), lastMessageId);
- assertEqualsByteArrays(lastMessages[0].getBody().array(), lastMsg.getBody().array());
+ assertEquals(lastMessages[0].getMessage().getMessageID(), lastMessageId);
+ assertEqualsByteArrays(lastMessages[0].getMessage().getBody().array(), lastMsg.getMessage().getBody().array());
assertEquals(0, buffers2.size());
More information about the jboss-cvs-commits
mailing list